Re: Where to specify trust.jks
Hello. Another solution might be putting it inside the custom container. I use it for testing ElasticsearchIO with TLS session. https://beam.apache.org/documentation/runtime/environments/ https://cloud.google.com/dataflow/docs/guides/using-custom-containers Below is a dockerfile example https://github.com/yuwtennis/apache-beam-pipeline-apps/blob/main/java/Dockerfile Thanks, Yu Watanabe On Fri, May 19, 2023 at 12:28 AM Pablo Estrada via user wrote: > > Hi Utkarsh, > you can pass a path in GCS (or a filesystem), and the workers should be able > to download it onto themselves. You'd pass > `gs://my-bucket-name/path/to/trust.jks`. Can you try that? > Best > -P. > > On Wed, May 10, 2023 at 1:58 PM Utkarsh Parekh > wrote: >> >> Hi, >> >> I'm testing a streaming app using kafka, Dafaflow, and Apache beam [Python]. >> >> "Error message from worker: org.apache.beam.sdk.util.UserCodeException: >> java.lang.RuntimeException: org.apache.kafka.common.KafkaException: Failed >> to construct kafka consumer >> org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:39) >> org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn$DoFnInvoker.invokeSplitRestriction(Unknown >> Source) >> org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForSplitRestriction(FnApiDoFnRunner.java:888) >> >> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:313) >> >> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:245) >> >> org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:1789) >> >> org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForPairWithRestriction(FnApiDoFnRunner.java:825) >> >> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:313) >> >> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:245) >> >> org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:1789) >> >> org.apache.beam.fn.harness.FnApiDoFnRunner.access$3000(FnApiDoFnRunner.java:143) >> >> org.apache.beam.fn.harness.FnApiDoFnRunner$NonWindowObservingProcessBundleContext.outputWithTimestamp(FnApiDoFnRunner.java:2357) >> >> org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContextBase.output(FnApiDoFnRunner.java:2527) >> >> org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.output(DoFnOutputReceivers.java:78) >> >> org.apache.beam.sdk.io.Read$OutputSingleSource.processElement(Read.java:1018) >> >> org.apache.beam.sdk.io.Read$OutputSingleSource$DoFnInvoker.invokeProcessElement(Unknown >> Source) >> org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForParDo(FnApiDoFnRunner.java:800) >> >> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:313) >> >> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:245) >> >> org.apache.beam.fn.harness.BeamFnDataReadRunner.forwardElementToConsumer(BeamFnDataReadRunner.java:213) >> >> org.apache.beam.sdk.fn.data.BeamFnDataInboundObserver2.multiplexElements(BeamFnDataInboundObserver2.java:158) >> >> org.apache.beam.fn.harness.control.ProcessBundleHandler.processBundle(ProcessBundleHandler.java:533) >> >> org.apache.beam.fn.harness.control.BeamFnControlClient.delegateOnInstructionRequestType(BeamFnControlClient.java:151) >> >> org.apache.beam.fn.harness.control.BeamFnControlClient$InboundObserver.lambda$onNext$0(BeamFnControlClient.java:116) >> >> java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) >> java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) >> org.apache.beam.sdk.util.UnboundedScheduledExecutorService$ScheduledFutureTask.run(UnboundedScheduledExecutorService.java:162) >> >> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) >> >> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) >> java.base/java.lang.Thread.run(Thread.java:829) Caused by: >> java.lang.RuntimeException: org.apache.kafka.common.KafkaException: Failed >> to construct kafka consumer >> org.apache.beam.sdk.io.kafka.KafkaUnbo
Re: How to run expansion service using go sdk in local development environment ?
Hello Danny. Ah . I see . Thank you for your advice. Thanks, Yu Watanabe On Mon, Aug 29, 2022 at 9:26 AM Danny McCormick via user wrote: > > Hey Yu, as the error you posted suggests, the Go direct runner which you're > using in your local development environment doesn't support external > transforms using an expansion service. If you're going to do a x-lang > transform using an expansion service you should use a different runner like > Dataflow, Flink, Spark, or one of the other runners listed here - > https://beam.apache.org/documentation/runners/capability-matrix/ > > Thanks, > Danny > > On Sun, Aug 28, 2022 at 7:50 AM Yu Watanabe wrote: >> >> Hello. >> >> I would like to ask a question about expansion service. I'm currently >> testing my expansion service in my local development environment. >> I have read notes about kafka in advance, >> >> https://github.com/apache/beam/blob/master/sdks/go/examples/kafka/taxi.go#L93 >> >> I have prepared sdk containers . >> >> [ywatanabe@laptop-archlinux Development]$ docker image ls | grep apache >> apache/beam_java8_sdk 2.42.0.dev >> f7e9d38b01fe 11 days ago 643MB >> apache/beam_go_sdk latest >> 8a87ea45255b 11 days ago 149MB >> >> However, when I run the code in my local environment, I get an error. >> >> [ywatanabe@laptop-archlinux go]$ go run ./examples/elasticsearch/sample.go \ >> --runner direct \ >> --sdk_harness_container_image_override >> ".*java.*,apache/beam_java8_sdk:2.42.0.dev" >> Hello world. >> 2022/08/28 20:39:01 Executing pipeline with the direct runner. >> 2022/08/28 20:39:01 Pipeline: >> 2022/08/28 20:39:01 Nodes: {1: []uint8/bytes GLO} >> {2: string/string GLO} >> {3: []uint8/bytes GLO} >> {4: []uint8/bytes GLO} >> Edges: 1: Impulse [] -> [Out: []uint8 -> {1: []uint8/bytes GLO}] >> 2: ParDo [In(Main): []uint8 <- {1: []uint8/bytes GLO}] -> [Out: T -> >> {2: string/string GLO}] >> 3: External [In(Main): string <- {2: string/string GLO}] -> [Out: >> []uint8 -> {3: []uint8/bytes GLO} Out: []uint8 -> {4: []uint8/bytes >> GLO}] >> Pipeline failed: translation failed >> caused by: >> external transforms like 3: External [In(Main): string <- {2: >> string/string GLO}] -> [Out: []uint8 -> {3: []uint8/bytes GLO} Out: >> []uint8 -> {4: []uint8/bytes GLO}] are not supported in the Go direct >> runner, please execute your pipel[ywatanabe@laptop-archlinux go]$ >> >> Am I missing something ? >> >> My main and io code can be found below. >> >> https://gist.github.com/yuwtennis/dec3bf3bfc0c4fa54d9d3565c98d008e >> >> Thanks, >> Yu >> >> >> -- >> Yu Watanabe >> >> linkedin: www.linkedin.com/in/yuwatanabe1/ >> twitter: twitter.com/yuwtennis -- Yu Watanabe linkedin: www.linkedin.com/in/yuwatanabe1/ twitter: twitter.com/yuwtennis
How to run expansion service using go sdk in local development environment ?
Hello. I would like to ask a question about expansion service. I'm currently testing my expansion service in my local development environment. I have read notes about kafka in advance, https://github.com/apache/beam/blob/master/sdks/go/examples/kafka/taxi.go#L93 I have prepared sdk containers . [ywatanabe@laptop-archlinux Development]$ docker image ls | grep apache apache/beam_java8_sdk 2.42.0.dev f7e9d38b01fe 11 days ago 643MB apache/beam_go_sdk latest 8a87ea45255b 11 days ago 149MB However, when I run the code in my local environment, I get an error. [ywatanabe@laptop-archlinux go]$ go run ./examples/elasticsearch/sample.go \ --runner direct \ --sdk_harness_container_image_override ".*java.*,apache/beam_java8_sdk:2.42.0.dev" Hello world. 2022/08/28 20:39:01 Executing pipeline with the direct runner. 2022/08/28 20:39:01 Pipeline: 2022/08/28 20:39:01 Nodes: {1: []uint8/bytes GLO} {2: string/string GLO} {3: []uint8/bytes GLO} {4: []uint8/bytes GLO} Edges: 1: Impulse [] -> [Out: []uint8 -> {1: []uint8/bytes GLO}] 2: ParDo [In(Main): []uint8 <- {1: []uint8/bytes GLO}] -> [Out: T -> {2: string/string GLO}] 3: External [In(Main): string <- {2: string/string GLO}] -> [Out: []uint8 -> {3: []uint8/bytes GLO} Out: []uint8 -> {4: []uint8/bytes GLO}] Pipeline failed: translation failed caused by: external transforms like 3: External [In(Main): string <- {2: string/string GLO}] -> [Out: []uint8 -> {3: []uint8/bytes GLO} Out: []uint8 -> {4: []uint8/bytes GLO}] are not supported in the Go direct runner, please execute your pipel[ywatanabe@laptop-archlinux go]$ Am I missing something ? My main and io code can be found below. https://gist.github.com/yuwtennis/dec3bf3bfc0c4fa54d9d3565c98d008e Thanks, Yu -- Yu Watanabe linkedin: www.linkedin.com/in/yuwatanabe1/ twitter: twitter.com/yuwtennis
Re: How to register as external cross language transform ?
Cham. Thank you for the advice. It worked ! I see it now ! beam:transform:org.apache.beam:elasticsearch_write:v1: org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader$1@5f049ea1 Thanks, Yu On Thu, Aug 18, 2022 at 6:28 AM Chamikara Jayalath via user wrote: > > > > On Wed, Aug 17, 2022 at 3:05 AM Yu Watanabe wrote: >> >> Hello. >> >> I am trying to write code for cross language transform for >> ElasticsearchIO but having trouble with it. >> I would appreciate it if I could get help. >> >> As describe in doc and also referencing KafkaIO , >> >> https://beam.apache.org/documentation/programming-guide/#1311-creating-cross-language-java-transforms >> >> I have annotated the code with @AutoService but the >> 'elasticsearch_write' does not appear when starting Expansion Service. >> >> https://github.com/yuwtennis/beam/blob/feat/elasticsearch-io-cross-lang/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java#L1991 >> >> Below is the snippet of command line operation. >> >> https://gist.github.com/yuwtennis/3c55ca45b31f9a302daf7b827a842ef6 >> >> I have built jar files with the below command in advance . >> >> ./gradlew :sdks:java [jar] >> >> Would there be any other instructions I should be referencing ? > > > It's because io-expansion-service currently does not include ElasticSearch > module as a dependency: > https://github.com/apache/beam/blob/master/sdks/java/io/expansion-service/build.gradle > Can you try adding it there and re-building the jar ? > > Thanks, > Cham > > >> >> >> Best Regards, >> Yu Watanabe >> >> -- >> Yu Watanabe >> >> linkedin: www.linkedin.com/in/yuwatanabe1/ >> twitter: twitter.com/yuwtennis -- Yu Watanabe linkedin: www.linkedin.com/in/yuwatanabe1/ twitter: twitter.com/yuwtennis
How to register as external cross language transform ?
Hello. I am trying to write code for cross language transform for ElasticsearchIO but having trouble with it. I would appreciate it if I could get help. As describe in doc and also referencing KafkaIO , https://beam.apache.org/documentation/programming-guide/#1311-creating-cross-language-java-transforms I have annotated the code with @AutoService but the 'elasticsearch_write' does not appear when starting Expansion Service. https://github.com/yuwtennis/beam/blob/feat/elasticsearch-io-cross-lang/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java#L1991 Below is the snippet of command line operation. https://gist.github.com/yuwtennis/3c55ca45b31f9a302daf7b827a842ef6 I have built jar files with the below command in advance . ./gradlew :sdks:java [jar] Would there be any other instructions I should be referencing ? Best Regards, Yu Watanabe -- Yu Watanabe linkedin: www.linkedin.com/in/yuwatanabe1/ twitter: twitter.com/yuwtennis
Re: Any guideline for building golang connector ?
Hello Danny. Thank you for the details. I appreciate your message. I am a newbie around building io . So I will look into the links and first build my knowledge. Thanks, Yu Watanabe On Fri, Jul 8, 2022 at 8:28 PM Danny McCormick via user < user@beam.apache.org> wrote: > Hey Yu, > > The guidance on that page should generally apply for Go as well, though we > are missing an example transform; I filed > https://github.com/apache/beam/issues/22194 to fix this, but a couple > examples are our textio implementation > <https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/io/textio/textio.go> > and this native streaming example > <https://github.com/apache/beam/blob/master/sdks/go/examples/native_wordcap/nativepubsubio/native.go> > (non-productionized, but it shows a lot of good concepts for handling > streaming sources). > > Another option would be to use Java's elasticsearch implementation > <https://github.com/apache/beam/blob/master/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java> > with a cross language transform. If you're thinking of contributing this > back to the main beam code base, I'd probably recommend that approach. In > general, we're currently investing most heavily in cross language > transforms because it's a much lower burden to build/support since it > reuses the main components of the original transform. There are several > examples of wrapped cross-language transforms in > https://github.com/apache/beam/tree/master/sdks/go/pkg/beam/io/xlang. If > you have specific questions about authoring IOs that aren't answered in the > docs/by example, feel free to drop them in this thread as well! > > Thanks, > Danny > > On Fri, Jul 8, 2022 at 3:51 AM Yu Watanabe wrote: > >> Hello . >> >> Is there any guideline for building a go sdk connector ? >> I was reviewing the document but I could not find one for golang. >> >> https://beam.apache.org/documentation/io/developing-io-overview/ >> >> I was thinking of building one for elasticsearch. >> >> Thanks, >> Yu Watanabe >> >> -- >> Yu Watanabe >> >> linkedin: www.linkedin.com/in/yuwatanabe1/ >> twitter: twitter.com/yuwtennis >> >> > -- Yu Watanabe linkedin: www.linkedin.com/in/yuwatanabe1/ twitter: twitter.com/yuwtennis
Any guideline for building golang connector ?
Hello . Is there any guideline for building a go sdk connector ? I was reviewing the document but I could not find one for golang. https://beam.apache.org/documentation/io/developing-io-overview/ I was thinking of building one for elasticsearch. Thanks, Yu Watanabe -- Yu Watanabe linkedin: www.linkedin.com/in/yuwatanabe1/ twitter: twitter.com/yuwtennis
Re: Is FileIO for Azure Blob Storage GA?
Hello Brian. Thank you for the follow up. Ikegami-san. As a reference, I will also put PR for the python implementation which was released in 2.25.0 . https://issues.apache.org/jira/browse/BEAM-6807 https://github.com/apache/beam/pull/12492 However, it is still not on the io FileSystem IO list so not sure with the release state though. https://beam.apache.org/documentation/io/built-in/ Thanks, Yu On Wed, Dec 22, 2021 at 9:38 AM Brian Hulette wrote: > The package is marked experimental [1], which is how we indicate a feature > isn't GA and may change. That being said, a lot of important Beam features > are still marked experimental but are actually quite solid and stable, so > there's some nuance here. > > @Pablo Estrada may be able to comment more since he > was involved with the initial contribution, but he may be out for the > holidays for a little while. > > [1] > https://beam.apache.org/releases/javadoc/2.34.0/org/apache/beam/sdk/io/azure/blobstore/package-summary.html > > On Tue, Dec 21, 2021 at 4:47 AM Yu Watanabe wrote: > >> Hello . >> >> Looks like unit tests are passed in these PRs >> . >> https://issues.apache.org/jira/browse/BEAM-10378 >> >> But the integration test with the WordCount example has not passed yet. >> >> https://github.com/apache/beam/pull/12649 >> >> SInce AzureBlobStoreFileSystemRegistrar class is available in 2.34.0 , >> you could test in your prototype environment. >> >> >> https://beam.apache.org/releases/javadoc/2.34.0/org/apache/beam/sdk/io/azure/blobstore/AzureBlobStoreFileSystemRegistrar.html >> >> Thanks, >> Yu >> >> >> >> Official doc FileSystem class for azure blob storage is not listed in the >> official doc. >> https://beam.apache.org/documentation/io/built-in/ >> >> >> >> >> >> On Tue, Dec 21, 2021 at 10:50 AM 池上有希乃 >> wrote: >> >>> Hi >>> >>> I think I will use Apache Beam for my business. >>> So there is a question. >>> Whether FileIO for Azure Blob Storage is GA phase or not? >>> >>> Thanks, >>> Yukino Ikegami >>> >> >> >> -- >> Yu Watanabe >> >> linkedin: www.linkedin.com/in/yuwatanabe1/ >> twitter: twitter.com/yuwtennis >> >> > -- Yu Watanabe linkedin: www.linkedin.com/in/yuwatanabe1/ twitter: twitter.com/yuwtennis
Re: Is FileIO for Azure Blob Storage GA?
Hello . Looks like unit tests are passed in these PRs . https://issues.apache.org/jira/browse/BEAM-10378 But the integration test with the WordCount example has not passed yet. https://github.com/apache/beam/pull/12649 SInce AzureBlobStoreFileSystemRegistrar class is available in 2.34.0 , you could test in your prototype environment. https://beam.apache.org/releases/javadoc/2.34.0/org/apache/beam/sdk/io/azure/blobstore/AzureBlobStoreFileSystemRegistrar.html Thanks, Yu Official doc FileSystem class for azure blob storage is not listed in the official doc. https://beam.apache.org/documentation/io/built-in/ On Tue, Dec 21, 2021 at 10:50 AM 池上有希乃 wrote: > Hi > > I think I will use Apache Beam for my business. > So there is a question. > Whether FileIO for Azure Blob Storage is GA phase or not? > > Thanks, > Yukino Ikegami > -- Yu Watanabe linkedin: www.linkedin.com/in/yuwatanabe1/ twitter: twitter.com/yuwtennis
Re: java.io.InvalidClassException with Spark 3.1.2
Sure. I starred your repository. On Sat, Aug 21, 2021 at 11:27 AM cw wrote: > Hello Yu, >i done lot of testing, it only work for spark 2+, not 3. if you need a > working example on kubernetes, > https://github.com/cometta/python-apache-beam-spark , feel free to > improve the code, if you would like to contribute. help me *star if if it > is useful for you. thank you > > On Monday, August 16, 2021, 12:37:46 AM GMT+8, Yu Watanabe < > yu.w.ten...@gmail.com> wrote: > > > Hello . > > I would like to ask question for spark runner. > > Using spark downloaded from below link, > > > https://www.apache.org/dyn/closer.lua/spark/spark-3.1.2/spark-3.1.2-bin-hadoop3.2.tgz > > I get below error when submitting a pipeline. > Full error is on > https://gist.github.com/yuwtennis/7b0c1dc0dcf98297af1e3179852ca693. > > > -- > 21/08/16 01:10:26 WARN TransportChannelHandler: Exception in connection > from /192.168.11.2:35601 > java.io.InvalidClassException: > scala.collection.mutable.WrappedArray$ofRef; local class incompatible: > stream classdesc serialVersionUID = 3456489343829468865, local class > serialVersionUID = 1028182004549731694 > at > java.base/java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:689) > ... > > -- > > SDK Harness and Job service are deployed as below. > > 1. SDK Harness > > sudo docker run --net=host apache/beam_spark3_job_server:2.31.0 > --spark-master-url=spark://localhost:7077 --clean-artifacts-per-job true > > 2. Job service > > sudo docker run --net=host apache/beam_python3.8_sdk:2.31.0 --worker_pool > > * apache/beam_spark_job_server:2.31.0 for spark 2.4.8 > > 3. SDK client code > > https://gist.github.com/yuwtennis/2e4c13c79f71e8f713e947955115b3e2 > > Spark 2.4.8 succeeded without any errors using above components. > > > https://archive.apache.org/dist/spark/spark-2.4.8/spark-2.4.8-bin-hadoop2.7.tgz > > Would there be any setting which you need to be aware of for spark 3.1.2 ? > > Thanks, > Yu Watanabe > > -- > Yu Watanabe > > linkedin: www.linkedin.com/in/yuwatanabe1/ > twitter: twitter.com/yuwtennis > > -- Yu Watanabe linkedin: www.linkedin.com/in/yuwatanabe1/ twitter: twitter.com/yuwtennis
Re: java.io.InvalidClassException with Spark 3.1.2
Kyle. Thank you. On Tue, Aug 17, 2021 at 5:55 AM Kyle Weaver wrote: > I was able to reproduce the error. I'm not sure why this would happen, > since as far as I can tell the Beam 2.31.0 Spark runner should be using > Spark 3.1.2 and Scala 2.12 [1]. I filed a JIRA issue for it. [2] > > [1] > https://github.com/apache/beam/pull/14897/commits/b6fca2bb79d9e7a69044b477460445456720ec58 > [2] https://issues.apache.org/jira/browse/BEAM-12762 > > > On Sun, Aug 15, 2021 at 9:37 AM Yu Watanabe wrote: > >> Hello . >> >> I would like to ask question for spark runner. >> >> Using spark downloaded from below link, >> >> >> https://www.apache.org/dyn/closer.lua/spark/spark-3.1.2/spark-3.1.2-bin-hadoop3.2.tgz >> >> I get below error when submitting a pipeline. >> Full error is on >> https://gist.github.com/yuwtennis/7b0c1dc0dcf98297af1e3179852ca693. >> >> >> -- >> 21/08/16 01:10:26 WARN TransportChannelHandler: Exception in connection >> from /192.168.11.2:35601 >> java.io.InvalidClassException: >> scala.collection.mutable.WrappedArray$ofRef; local class incompatible: >> stream classdesc serialVersionUID = 3456489343829468865, local class >> serialVersionUID = 1028182004549731694 >> at >> java.base/java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:689) >> ... >> >> -- >> >> SDK Harness and Job service are deployed as below. >> >> 1. SDK Harness >> >> sudo docker run --net=host apache/beam_spark3_job_server:2.31.0 >> --spark-master-url=spark://localhost:7077 --clean-artifacts-per-job true >> >> 2. Job service >> >> sudo docker run --net=host apache/beam_python3.8_sdk:2.31.0 --worker_pool >> >> * apache/beam_spark_job_server:2.31.0 for spark 2.4.8 >> >> 3. SDK client code >> >> https://gist.github.com/yuwtennis/2e4c13c79f71e8f713e947955115b3e2 >> >> Spark 2.4.8 succeeded without any errors using above components. >> >> >> https://archive.apache.org/dist/spark/spark-2.4.8/spark-2.4.8-bin-hadoop2.7.tgz >> >> Would there be any setting which you need to be aware of for spark 3.1.2 ? >> >> Thanks, >> Yu Watanabe >> >> -- >> Yu Watanabe >> >> linkedin: www.linkedin.com/in/yuwatanabe1/ >> twitter: twitter.com/yuwtennis >> >> > -- Yu Watanabe linkedin: www.linkedin.com/in/yuwatanabe1/ twitter: twitter.com/yuwtennis
java.io.InvalidClassException with Spark 3.1.2
Hello . I would like to ask question for spark runner. Using spark downloaded from below link, https://www.apache.org/dyn/closer.lua/spark/spark-3.1.2/spark-3.1.2-bin-hadoop3.2.tgz I get below error when submitting a pipeline. Full error is on https://gist.github.com/yuwtennis/7b0c1dc0dcf98297af1e3179852ca693. -- 21/08/16 01:10:26 WARN TransportChannelHandler: Exception in connection from /192.168.11.2:35601 java.io.InvalidClassException: scala.collection.mutable.WrappedArray$ofRef; local class incompatible: stream classdesc serialVersionUID = 3456489343829468865, local class serialVersionUID = 1028182004549731694 at java.base/java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:689) ... -- SDK Harness and Job service are deployed as below. 1. SDK Harness sudo docker run --net=host apache/beam_spark3_job_server:2.31.0 --spark-master-url=spark://localhost:7077 --clean-artifacts-per-job true 2. Job service sudo docker run --net=host apache/beam_python3.8_sdk:2.31.0 --worker_pool * apache/beam_spark_job_server:2.31.0 for spark 2.4.8 3. SDK client code https://gist.github.com/yuwtennis/2e4c13c79f71e8f713e947955115b3e2 Spark 2.4.8 succeeded without any errors using above components. https://archive.apache.org/dist/spark/spark-2.4.8/spark-2.4.8-bin-hadoop2.7.tgz Would there be any setting which you need to be aware of for spark 3.1.2 ? Thanks, Yu Watanabe -- Yu Watanabe linkedin: www.linkedin.com/in/yuwatanabe1/ twitter: twitter.com/yuwtennis
Re: Submit Python Beam on Spark Dataproc
Hello Mahan. Sorry for the late reply. > Still waiting for startup of environment from localhost:5 for worker id 1-1 >From the message , it seems that something is wrong with connection between Worker node in spark cluster and SDK harness. According to this slide runner worker (in your context spark worker) , should also have connectivity with sdk harness container. https://docs.google.com/presentation/d/1Cso0XP9dmj77OD9Bd53C1M3W1sPJF0ZnA20gzb2BPhE/edit#slide=id.g42e4c9aad6_1_0 Could you please also try setting ssh tunneling to spark worker node as well ? Thanks, Yu On Thu, Aug 12, 2021 at 9:07 PM Mahan Hosseinzadeh wrote: > Thanks Yu for the help and the tips. > > I ran the following steps but my job is stuck and can't get submitted to > Dataproc and I keep getting this message in job-server: > Still waiting for startup of environment from localhost:5 for worker > id 1-1 > > > - > *Beam code:* > pipeline_options = PipelineOptions([ > "--runner=PortableRunner", > "--job_endpoint=localhost:8099", > "--environment_type=EXTERNAL", > "--environment_config=localhost:5" > ]) > > - > *Job Server:* > I couldn't use Docker because host networking doesn't work on Mac OS and I > used Gradle instead > > ./gradlew :runners:spark:3:job-server:runShadow > > - > *Beam Worker Pool:* > docker run -p=5:5 apache/beam_python3.7_sdk --worker_pool > > - > *SSH tunnel to the master node:* > gcloud compute ssh \ > --project \ > --zone \ > -- -NL 7077:localhost:7077 > > ----- > > Thanks, > Mahan > > On Tue, Aug 10, 2021 at 3:53 PM Yu Watanabe wrote: > >> Hello . >> >> Would this page help ? I hope it helps. >> >> https://beam.apache.org/documentation/runners/spark/ >> >> > Running on a pre-deployed Spark cluster >> >> 1- What's spark-master-url in case of a remote cluster on Dataproc? Is >> 7077 the master url port? >> * Yes. >> >> 2- Should we ssh tunnel to sparkMasterUrl port using gcloud compute ssh? >> * Job server should be able to communicate with Spark master node port >> 7077. So I believe it is Yes. >> >> 3- What's the environment_type? Can we use DOCKER? Then what's the SDK >> Harness Configuration? >> * This is the configuration of how you want your harness container to >> spin up. >> >> https://beam.apache.org/documentation/runtime/sdk-harness-config/ >> >> For DOCKER , you will need docker deployed on all spark worker nodes. >> > User code is executed within a container started on each worker node >> >> I used EXTERNAL when I did it with flink cluster before. >> >> e.g >> >> https://github.com/yuwtennis/apache-beam/blob/master/flink-session-cluster/docker/samples/src/sample.py#L14 >> >> 4- Should we run the job-server outside of the Dataproc cluster or should >> we run it in the master node? >> * Depends. It could be inside or outside the master node. But if you are >> connecting to full managed service, then outside might be better. >> >> https://beam.apache.org/documentation/runners/spark/ >> >> > Start JobService that will connect with the Spark master >> >> Thanks, >> Yu >> >> On Tue, Aug 10, 2021 at 7:53 PM Mahan Hosseinzadeh >> wrote: >> >>> Hi, >>> >>> I have a Python Beam job that works on Dataflow but we would like to >>> submit it on a Spark Dataproc cluster with no Flink involvement. >>> I already spent days but failed to figure out how to use PortableRunner >>> with the beam_spark_job_server to submit my Python Beam job to Spark >>> Dataproc. All the Beam docs are about Flink and there is no guideline about >>> Spark with Dataproc. >>> Some relevant questions might be: >>> 1- What's spark-master-url in case of a remote cluster on Dataproc? Is >>> 7077 the master url port? >>> 2- Should we ssh tunnel to sparkMasterUrl port using gcloud compute ssh? >>> 3- What's the environment_type? Can we use DOCKER? Then what's the SDK >>> Harness Configuration? >>> 4- Should we run the job-server outside of the Dataproc cluster or >>> should we run it in the master node? >>> >>> Thanks, >>> Mahan >>> >> >> >> -- >> Yu Watanabe >> >> linkedin: www.linkedin.com/in/yuwatanabe1/ >> twitter: twitter.com/yuwtennis >> >> > -- Yu Watanabe linkedin: www.linkedin.com/in/yuwatanabe1/ twitter: twitter.com/yuwtennis
Re: Submit Python Beam on Spark Dataproc
Hello . Would this page help ? I hope it helps. https://beam.apache.org/documentation/runners/spark/ > Running on a pre-deployed Spark cluster 1- What's spark-master-url in case of a remote cluster on Dataproc? Is 7077 the master url port? * Yes. 2- Should we ssh tunnel to sparkMasterUrl port using gcloud compute ssh? * Job server should be able to communicate with Spark master node port 7077. So I believe it is Yes. 3- What's the environment_type? Can we use DOCKER? Then what's the SDK Harness Configuration? * This is the configuration of how you want your harness container to spin up. https://beam.apache.org/documentation/runtime/sdk-harness-config/ For DOCKER , you will need docker deployed on all spark worker nodes. > User code is executed within a container started on each worker node I used EXTERNAL when I did it with flink cluster before. e.g https://github.com/yuwtennis/apache-beam/blob/master/flink-session-cluster/docker/samples/src/sample.py#L14 4- Should we run the job-server outside of the Dataproc cluster or should we run it in the master node? * Depends. It could be inside or outside the master node. But if you are connecting to full managed service, then outside might be better. https://beam.apache.org/documentation/runners/spark/ > Start JobService that will connect with the Spark master Thanks, Yu On Tue, Aug 10, 2021 at 7:53 PM Mahan Hosseinzadeh wrote: > Hi, > > I have a Python Beam job that works on Dataflow but we would like to > submit it on a Spark Dataproc cluster with no Flink involvement. > I already spent days but failed to figure out how to use PortableRunner > with the beam_spark_job_server to submit my Python Beam job to Spark > Dataproc. All the Beam docs are about Flink and there is no guideline about > Spark with Dataproc. > Some relevant questions might be: > 1- What's spark-master-url in case of a remote cluster on Dataproc? Is > 7077 the master url port? > 2- Should we ssh tunnel to sparkMasterUrl port using gcloud compute ssh? > 3- What's the environment_type? Can we use DOCKER? Then what's the SDK > Harness Configuration? > 4- Should we run the job-server outside of the Dataproc cluster or should > we run it in the master node? > > Thanks, > Mahan > -- Yu Watanabe linkedin: www.linkedin.com/in/yuwatanabe1/ twitter: twitter.com/yuwtennis
Re: Running Beam on Flink
Hello . >From this line in the log, may I confirm whether you set up harness container ? I have executed your code in below environment and it worked. Python version is newer though. OS: Fedora 31 BEAM: 2.19.0 PYTHON: 3.7.6 Compared to my log , I realized below line does not exist in your log. 357 [grpc-default-executor-1] INFO org.apache.beam.runners.fnexecution.control.FnApiControlClientPoolService - Beam Fn Control client connected with id 1-1 358 [grpc-default-executor-1] INFO org.apache.beam.runners.fnexecution.data.GrpcDataService - Beam Fn Data client connected. One thing I realized was in LOOPBACK mode for harness container , job server uses hostname for when communicating with flink. ___ Caused by: java.net.UnknownHostException: fedora-desktop: Name or service not known at java.net.Inet6AddressImpl.lookupAllHostAddr(Native Method) at java.net.InetAddress$2.lookupAllHostAddr(InetAddress.java:928) at java.net.InetAddress.getAddressesFromNameService(InetAddress.java:1323) at java.net.InetAddress.getLocalHost(InetAddress.java:1500) ... 34 more ___ Perhaps temporarily adding your hostname to "hosts file" by resolving with 127.0.0.1 and see if it works ? Thanks, Yu Watanabe On Mon, Feb 10, 2020 at 3:29 AM Xander Song wrote: > > Hi Jincheng, > > Thanks for your help. Yes, I am using Mac. Your suggestion allowed me to > submit the job on port 8099. However, I am now encountering a different error > message. > > ERROR:root:java.net.ConnectException: Connection refused > > Traceback (most recent call last): > > File "test_beam_local_flink.py", line 18, in > > | apache_beam.Map(lambda x: print(x)) > > File > "/Users/xander/Projects/flink-test/env/lib/python3.7/site-packages/apache_beam/pipeline.py", > line 481, in __exit__ > > self.run().wait_until_finish() > > File > "/Users/xander/Projects/flink-test/env/lib/python3.7/site-packages/apache_beam/pipeline.py", > line 461, in run > > self._options).run(False) > > File > "/Users/xander/Projects/flink-test/env/lib/python3.7/site-packages/apache_beam/pipeline.py", > line 474, in run > > return self.runner.run_pipeline(self, self._options) > > File > "/Users/xander/Projects/flink-test/env/lib/python3.7/site-packages/apache_beam/runners/portability/portable_runner.py", > line 334, in run_pipeline > > result.wait_until_finish() > > File > "/Users/xander/Projects/flink-test/env/lib/python3.7/site-packages/apache_beam/runners/portability/portable_runner.py", > line 455, in wait_until_finish > > self._job_id, self._state, self._last_error_message())) > > RuntimeError: Pipeline > BeamApp-root-0209182021-fc6ec677_c351e1bb-d428-4475-9910-4544d8ec1de4 failed > in state FAILED: java.net.ConnectException: Connection refused > > > I've attached a text file containing the output from the terminal running the > Docker container in case that is informative (it is quite lengthy). Any > suggestions are appreciated. > > Best, > Xander > > On Sun, Feb 9, 2020 at 2:44 AM jincheng sun wrote: >> >> >> Hi Xander, >> >> Are you using Mac? The option --net=host doesn't work on Mac[1]. Could you >> try to see if the command >> `docker run -p 8099:8099 -p 8098:8098 -p 8097:8097 >> apachebeam/flink1.9_job_server:latest` works? >> >> [1] https://forums.docker.com/t/should-docker-run-net-host-work/14215/28 >> >> Best, >> Jincheng >> - >> Twitter: https://twitter.com/sunjincheng121 >> - >> >> >> Xander Song 于2020年2月9日周日 上午11:52写道: >>> >>> Do you have any suggestions for addressing this issue? I am unsure of what >>> to try next. >>> >>> On Fri, Feb 7, 2020 at 5:55 PM Ankur Goenka wrote: >>>> >>>> That seems to be a problem. >>>> >>>> When I try the command, I get >>>> >>>> $ telnet localhost 8099 >>>> Trying ::1... >>>> Connected to localhost. >>>> Escape character is '^]'. >>>> �^CConnection closed by foreign host. >>>> >>>> On Fri, Feb 7, 2020 at 5:34 PM Xander Song wrote: >>>>> >>>>> Thanks for the response. After entering telnet localhost 8099, I receive >>>>> >>>>>
Re: Assertion error when using kafka module in python
Hello. I have some progress with kafka source output on python but still not able to get "WriteToKafka" working. I appreciate if I could get help. I was able to pass the assertion error following example for External Transform. Defining ParDo instead Map before "WriteToKafka", I was able to use the correct Urn. https://github.com/apache/beam/blob/819a242453eac9a2b3d79df0b259c8cc844b6af1/sdks/python/apache_beam/examples/wordcount_xlang.py#L70 However, I get coder casting error on jobserver. == jobserver_1| java.lang.ClassCastException: org.apache.beam.sdk.coders.ByteArrayCoder cannot be cast to org.apache.beam.sdk.coders.KvCoder at org.apache.beam.sdk.io.kafka.KafkaIO$Write.expand(KafkaIO.java:1552) == In KafkaIO.java, as error says, it tries to cast the input coder to KVCoder. == KvCoder kvCoder = (KvCoder) input.getCoder(); == In my code, following the example from ExternalTransform, I have passed bytes object to External Transform. == class ToKV(beam.DoFn): def process(self, elem): return bytes(elem.encode()) .. res = ( lines | beam.ParDo(ToKV()).with_output_types(bytes) | WriteToKafka( { 'acks': 'all', 'bootstrap.servers': 'localhost:9092'}, 'beam', 'org.apache.kafka.common.serialization.ByteArraySerializer', 'org.apache.kafka.common.serialization.ByteArraySerializer', 'localhost:8097' )) == So my question is which coder should I use in order to cast into KVCoder on java side ? Thanks, Yu On Thu, Jan 9, 2020 at 7:17 AM Chamikara Jayalath wrote: > > Hmm, seems like a Java (external) ParDo is being forwarded to Python SDK for > execution somehow. +Maximilian Michels might know more. > > On Sun, Jan 5, 2020 at 2:57 AM Yu Watanabe wrote: >> >> Hello. >> >> I would like to use sinking data into kafka using kafka module for >> python , however, >> getting below assertion error when pipeline is executed. >> >> -- >> beam_1 | Traceback (most recent call last): >> beam_1 | File "src/sample.py", line 50, in >> beam_1 | main() >> beam_1 | File "src/sample.py", line 46, in main >> beam_1 | 'localhost:8097' )) >> beam_1 | File >> "/usr/local/lib/python3.7/site-packages/apache_beam/pipeline.py", line >> 427, in __exit__ >> beam_1 | self.run().wait_until_finish() >> beam_1 | File >> "/usr/local/lib/python3.7/site-packages/apache_beam/pipeline.py", line >> 407, in run >> beam_1 | self._options).run(False) >> beam_1 | File >> "/usr/local/lib/python3.7/site-packages/apache_beam/pipeline.py", line >> 680, in from_runner_api >> beam_1 | context.transforms.get_by_id(root_transform_id)] >> beam_1 | File >> "/usr/local/lib/python3.7/site-packages/apache_beam/runners/pipeline_context.py", >> line 94, in get_by_id >> beam_1 | self._id_to_proto[id], self._pipeline_context) >> beam_1 | File >> "/usr/local/lib/python3.7/site-packages/apache_beam/pipeline.py", line >> 913, in from_runner_api >> beam_1 | part = context.transforms.get_by_id(transform_id) >> beam_1 | File >> "/usr/local/lib/python3.7/site-packages/apache_beam/runners/pipeline_context.py", >> line 94, in get_by_id >> beam_1 | self._id_to_proto[id], self._pipeline_context) >> beam_1 | File >> "/usr/local/lib/python3.7/site-packages/apache_beam/pipeline.py", line >> 913, in from_runner_api >> beam_1 | part = context.transforms.get_by_id(transform_id) >> beam_1 | File >> "/usr/local/lib/python3.7/site-packages/apache_beam/runners/pipeline_context.py", >> line 94, in get_by_id >> beam_1 | self._id_to_proto[id], self._pipeline_context) >> beam_1 | File >> "/usr/local/lib/python3.7/site-packages/apache_beam/pipeline.py", line >> 913, in from_runner_api >> beam_1 | part = context.transforms.get_by_id(transform_id) >> beam_1 | File >>
Assertion error when using kafka module in python
| WriteToKafka( { 'acks': 'all', 'bootstrap.servers': 'localhost:9092'}, 'beam', 'org.apache.kafka.common.serialization.ByteArraySerializer', 'org.apache.kafka.common.serialization.ByteArraySerializer', 'localhost:8097' )) -- I have my platform built upon docker engine and used below combination of modules. 1. apache-beam: 2.16.0 2. flink: 1.8 3. python-sdk(37): compiled using release-2.16.0 4. jobserver: compiled using release-2.16.0 5. kafka: 2.3.x (using confluent 5.3.x) * docker compose https://github.com/yuwtennis/beam-deployment/blob/master/flink-session-cluster/docker/docker-compose.yml Would there be any setting I am missing to avoid the error ? I appreciate if I could get assistance for solving the error. Best Regards, Yu Watanabe -- Yu Watanabe yu.w.ten...@gmail.com
Re: Worker pool dies with error: context deadline exceeded
Hello Kyle. Thank you for the reply and happy new year for you too ! Yes. I got it working by sharing all container network with host kernel . My docker compose is below. https://github.com/yuwtennis/beam-deployment/blob/master/flink-session-cluster/docker/docker-compose.yml I confirmed it by writing data to file on gcs. with beam.Pipeline(options=options) as p: lines = ( p | beam.Create(['Hello World.', 'Apache beam']) ) # Write to GCS ( lines | WriteToText('gs://{}/sample.txt'.format(project_id)) ) https://github.com/yuwtennis/beam-deployment/blob/master/flink-session-cluster/docker/samples/src/sample.py RESULT $ gsutil cat gs://${PROJECT_ID}/sample.txt-0-of-1 Hello World. Apache beam Thanks, Yu Watanabe On Fri, Jan 3, 2020 at 5:58 AM Kyle Weaver wrote: > This is the root cause: > > > python-sdk_1 | 2019/12/31 02:59:45 Failed to obtain provisioning > > information: failed to dial server at localhost:45759 > > The Flink task manager and Beam SDK harness use connections over > `localhost` to communicate. > > You will have to put `taskmanager` and `python-sdk` on the same host. > Maybe you can try using `--networking=host` so they will share the > namespace. https://docs.docker.com/network/host/ > > Happy new year! > > Kyle > > On Mon, Dec 30, 2019 at 7:21 PM Yu Watanabe wrote: > >> Hello . >> >> I would like to ask question about the error I am facing with worker >> pool of sdk container. >> I get below error when I run the pipeline. >> >> >> python-sdk_1 | 2019/12/31 02:57:26 Starting worker pool 1: python -m >> apache_beam.runners.worker.worker_pool_main --service_port=5 >> --container_executable=/opt/apache/beam/boot >> python-sdk_1 | INFO:root:Started worker pool servicer at port: >> localhost:5 with executable: /opt/apache/beam/boot >> python-sdk_1 | WARNING:root:Starting worker with command >> ['/opt/apache/beam/boot', '--id=1-1', >> '--logging_endpoint=localhost:35615', >> '--artifact_endpoint=localhost:42723', >> '--provision_endpoint=localhost:45759', >> '--control_endpoint=localhost:43185'] >> python-sdk_1 | 2019/12/31 02:57:45 Initializing python harness: >> /opt/apache/beam/boot --id=1-1 --logging_endpoint=localhost:35615 >> --artifact_endpoint=localhost:42723 >> --provision_endpoint=localhost:45759 >> --control_endpoint=localhost:43185 >> python-sdk_1 | 2019/12/31 02:59:45 Failed to obtain provisioning >> information: failed to dial server at localhost:45759 >> python-sdk_1 | caused by: >> python-sdk_1 | context deadline exceeded >> >> >> >> In flink taskmanager's log , it keeps waiting for response from sdk >> container. >> >> >> taskmanager_1 | 2019-12-31 02:57:45,445 INFO >> org.apache.flink.configuration.GlobalConfiguration- >> Loading configuration property: query.server.port, 6125 >> taskmanager_1 | 2019-12-31 02:58:26,678 INFO >> org.apache.beam.runners.fnexecution.environment.ExternalEnvironmentFactory >> - Still waiting for startup of environment from python-sdk:5 for >> worker id 1-1 >> >> >> >> Looking at flink jobmanager's log, error is logged after starting map >> transform. >> So looks like, request from taskmanager is reached to sdk conatiner >> but not processed correctly. >> Sounds like I am missing some setting for sdk container.. >> >> >> jobmanager_1 | 2019-12-31 02:57:44,987 INFO >> org.apache.flink.runtime.executiongraph.ExecutionGraph- >> DataSource (Impulse) (1/1) (4e6f68fd31bafa066b740943bc3ea736) switched >> from RUNNING to FINISHED. >> jobmanager_1 | 2019-12-31 02:57:44,989 INFO >> org.apache.flink.runtime.executiongraph.ExecutionGraph- >> MapPartition (MapPartition at [2]Create/{FlatMap(> core.py:2468>), Map(decode)}) (1/1) (067ed452ebd15c1175ecde0ae40e8ac7) >
Re: Beam on Flink with Python SDK and using GCS as artifacts directory
Matthew > Just to verify that the preferred python version is python3? Harness container supports both python 2 and 3. https://beam.apache.org/documentation/runtime/environments/ In my opinion, considering that python2's EOL is Jan 1, 2020 , python 3 would be the choice. Thanks, Yu Watanabe On Tue, Dec 24, 2019 at 8:24 AM Matthew Rafael Magsombol < raffy4...@gmail.com> wrote: > So I'm able to put this to debug mode within the flink jobmanager and > taskmanager and at the moment, the taskmanager is complaining about these: > `` > > 2019-12-23 22:43:06,892 DEBUG > org.apache.beam.vendor.grpc.v1p21p0.io.netty.util.internal.NativeLibraryLoader > - Unable to load the library > 'org_apache_beam_vendor_grpc_v1p21p0_netty_transport_native_epoll_x86_64', > trying other loading mechanism. > > java.lang.UnsatisfiedLinkError: no > org_apache_beam_vendor_grpc_v1p21p0_netty_transport_native_epoll_x86_64 in > java.library.path > > at java.lang.ClassLoader.loadLibrary(ClassLoader.java:1860) > > at java.lang.Runtime.loadLibrary0(Runtime.java:870) > > at java.lang.System.loadLibrary(System.java:1122) > > at > org.apache.beam.vendor.grpc.v1p21p0.io.netty.util.internal.NativeLibraryUtil.loadLibrary(NativeLibraryUtil.java:38) > > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > > at java.lang.reflect.Method.invoke(Method.java:498) > > at > org.apache.beam.vendor.grpc.v1p21p0.io.netty.util.internal.NativeLibraryLoader$1.run(NativeLibraryLoader.java:369) > > at java.security.AccessController.doPrivileged(Native Method) > > at > org.apache.beam.vendor.grpc.v1p21p0.io.netty.util.internal.NativeLibraryLoader.loadLibraryByHelper(NativeLibraryLoader.java:361) > > at > org.apache.beam.vendor.grpc.v1p21p0.io.netty.util.internal.NativeLibraryLoader.loadLibrary(NativeLibraryLoader.java:339) > > at > org.apache.beam.vendor.grpc.v1p21p0.io.netty.util.internal.NativeLibraryLoader.load(NativeLibraryLoader.java:136) > > at > org.apache.beam.vendor.grpc.v1p21p0.io.netty.channel.epoll.Native.loadNativeLibrary(Native.java:198) > > at > org.apache.beam.vendor.grpc.v1p21p0.io.netty.channel.epoll.Native.(Native.java:61) > > at > org.apache.beam.vendor.grpc.v1p21p0.io.netty.channel.epoll.Epoll.(Epoll.java:38) > > at java.lang.Class.forName0(Native Method) > > at java.lang.Class.forName(Class.java:264) > > `` > > > And the other error is this: > > `` > > 2019/12/23 22:43:09 Executing: python -m > apache_beam.runners.worker.sdk_worker_main > > 2019/12/23 22:43:09 Python exited: exec: "python": executable file not > found in $PATH > > `` > > This is more towards complaining that the exec command for the sdk is > complaining about python not existing ( my current environment that I'm > running this on, in both the job manager server and the python code itself > are both inside the jobmanager...I have the flink jobmanager container and > the flink taskmanager with base image as flink version 1.8.3 ) > > Just to verify that the preferred python version is python3? > > So re-pointing `python` to python3 was a success! > > I did run into another problem where I sink the transformed data into a > local file but I think I'll create a separate thread on that. In > production, I don't really want to sink into a local file but rather into > some distributed storage, so this is a non-concern atm. > > > My last question is...do I need to worry about that java package issue? > > If not, then that's it for now, thanks! > > > On Mon, Dec 23, 2019 at 7:55 AM Kyle Weaver wrote: > >> > It will be great if you can get the error from the failing process. >> >> Note that you will have to set the log level to DEBUG to get output from >> the process. >> >> On Fri, Dec 20, 2019 at 6:23 PM Ankur Goenka wrote: >> >>> Hi Matthew, >>> >>> It will be great if you can get the error from the failing process. >>> My suspicions are: >>> - Flink task manager container access to gcs location. >>> - As you are running Flink task manager in a container >>> "/beam_src_code/sdks/python/container/build/target/launcher/linux_amd64/boot\" >>> path is not applicable. >>> - Boot command is not run from a valid activated python environment. >>> - You can use >>> https://github.com/apache/beam/blob/605c59b383a77b117bb6b07021e8c41cb13b438f/sdks/python/test-suites/portable/py2/build.gradle#L179 &g
Worker pool dies with error: context deadline exceeded
Hello . I would like to ask question about the error I am facing with worker pool of sdk container. I get below error when I run the pipeline. python-sdk_1 | 2019/12/31 02:57:26 Starting worker pool 1: python -m apache_beam.runners.worker.worker_pool_main --service_port=5 --container_executable=/opt/apache/beam/boot python-sdk_1 | INFO:root:Started worker pool servicer at port: localhost:5 with executable: /opt/apache/beam/boot python-sdk_1 | WARNING:root:Starting worker with command ['/opt/apache/beam/boot', '--id=1-1', '--logging_endpoint=localhost:35615', '--artifact_endpoint=localhost:42723', '--provision_endpoint=localhost:45759', '--control_endpoint=localhost:43185'] python-sdk_1 | 2019/12/31 02:57:45 Initializing python harness: /opt/apache/beam/boot --id=1-1 --logging_endpoint=localhost:35615 --artifact_endpoint=localhost:42723 --provision_endpoint=localhost:45759 --control_endpoint=localhost:43185 python-sdk_1 | 2019/12/31 02:59:45 Failed to obtain provisioning information: failed to dial server at localhost:45759 python-sdk_1 | caused by: python-sdk_1 | context deadline exceeded In flink taskmanager's log , it keeps waiting for response from sdk container. taskmanager_1 | 2019-12-31 02:57:45,445 INFO org.apache.flink.configuration.GlobalConfiguration- Loading configuration property: query.server.port, 6125 taskmanager_1 | 2019-12-31 02:58:26,678 INFO org.apache.beam.runners.fnexecution.environment.ExternalEnvironmentFactory - Still waiting for startup of environment from python-sdk:5 for worker id 1-1 Looking at flink jobmanager's log, error is logged after starting map transform. So looks like, request from taskmanager is reached to sdk conatiner but not processed correctly. Sounds like I am missing some setting for sdk container.. jobmanager_1 | 2019-12-31 02:57:44,987 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph- DataSource (Impulse) (1/1) (4e6f68fd31bafa066b740943bc3ea736) switched from RUNNING to FINISHED. jobmanager_1 | 2019-12-31 02:57:44,989 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph- MapPartition (MapPartition at [2]Create/{FlatMap(), Map(decode)}) (1/1) (067ed452ebd15c1175ecde0ae40e8ac7) switched from DEPLOYING to RUNNING. Command line for building sdk container is ./gradlew :sdks:python:container:py37:dockerPush -Pdocker-repository-root=${GCR_HOSTNAME}/${PROJECT_ID} -Pdocker-tag=release-2.16.0 My docker compose https://github.com/yuwtennis/beam-deployment/blob/master/flink-session-cluster/docker/docker-compose.yml My pipeline code https://github.com/yuwtennis/beam-deployment/blob/master/flink-session-cluster/docker/samples/src/sample.py Would there be any settings I need to use for starting up sdk container ? Best Regards, Yu Watanabe -- Yu Watanabe yu.w.ten...@gmail.com
Re: Protocol message had invalid UTF-8
Kyle. Thank you for the reply. Error had disappeared after creating sdk and job-server with correct apache beam branch (2.16.0 in this case). Thanks, Yu Watanabe On Tue, Dec 31, 2019 at 2:30 AM Kyle Weaver wrote: > This error can happen when the job server and sdk versions are mismatched > (due to protobuf incompatibilities). The sdk and job server containers > should use the same beam version. > > On Mon, Dec 30, 2019 at 11:47 AM Yu Watanabe > wrote: > >> Hello. >> >> I would like to get help with issue having in job-server. >> >> I have set up flink (session cluster) and job server everything in >> docker , however, jobserver seems to reject requests from the beam >> client as described in below error. >> >> == >> jobserver_1| [grpc-default-executor-11] WARN >> org.apache.beam.runners.fnexecution.jobsubmission.InMemoryJobService - >> Encountered Unexpected Exception during validation >> jobserver_1| java.lang.RuntimeException: Failed to validate >> transform ref_AppliedPTransform_Create/FlatMap(> core.py:2468>)_4 >> jobserver_1| at >> >> org.apache.beam.runners.core.construction.graph.PipelineValidator.validateTransform(PipelineValidator.java:203) >> jobserver_1| at >> >> org.apache.beam.runners.core.construction.graph.PipelineValidator.validateComponents(PipelineValidator.java:112) >> >> jobserver_1| Caused by: >> >> org.apache.beam.vendor.grpc.v1p21p0.com.google.protobuf.InvalidProtocolBufferException: >> Protocol message had invalid UTF-8. >> jobserver_1| at >> >> org.apache.beam.vendor.grpc.v1p21p0.com.google.protobuf.InvalidProtocolBufferException.invalidUtf8(InvalidProtocolBufferException.java:141) >> jobserver_1| at >> >> org.apache.beam.vendor.grpc.v1p21p0.com.google.protobuf.Utf8$DecodeUtil.handleTwoBytes(Utf8.java:1909) >> jobserver_1| at >> >> org.apache.beam.vendor.grpc.v1p21p0.com.google.protobuf.Utf8$DecodeUtil.access$700(Utf8.java:1883) >> >> == >> >> Below is description about my environment. >> >> 1. Combination of versions of each containers are, >> >> apache beam: 2.16.0 >> sdk: python3.7_sdk: 2.19.0.dev >> flink: flink1.9_job_server:latest >> >> 2. Set up containers using beam repository. >> >> https://github.com/yuwtennis/beam-deployment/wiki/Setting-up-flink-runner >> >> 3. Set up docker-compose as below. >> >> >> https://github.com/yuwtennis/beam-deployment/blob/master/flink-session-cluster/docker-compose.yml >> >> 4. Kept source code very simple. >> >> >> https://github.com/yuwtennis/beam-deployment/blob/master/flink-session-cluster/samples/src/sample.py >> >> Do I need any other options in job server to properly >> serialize/deserialize the incoming message from beam client ? >> >> My current options are below. >> == >> command: [ "--artifacts-dir", "${ARTIFACTS_DIR}", "--flink-master-url", >> "${FLINK_MASTER_URL}", "--job-host", "${JOB_HOST}", "--job-port", >> "${JOB_PORT}" ] >> == >> >> I appreciate if I could get some help. >> >> Thanks, >> Yu Watanabe >> >> -- >> Yu Watanabe >> yu.w.ten...@gmail.com >> > -- Yu Watanabe yu.w.ten...@gmail.com [image: LinkedIn icon] <https://www.linkedin.com/in/yuwatanabe1> [image: Twitter icon] <https://twitter.com/yuwtennis>
Protocol message had invalid UTF-8
Hello. I would like to get help with issue having in job-server. I have set up flink (session cluster) and job server everything in docker , however, jobserver seems to reject requests from the beam client as described in below error. == jobserver_1| [grpc-default-executor-11] WARN org.apache.beam.runners.fnexecution.jobsubmission.InMemoryJobService - Encountered Unexpected Exception during validation jobserver_1| java.lang.RuntimeException: Failed to validate transform ref_AppliedPTransform_Create/FlatMap()_4 jobserver_1| at org.apache.beam.runners.core.construction.graph.PipelineValidator.validateTransform(PipelineValidator.java:203) jobserver_1| at org.apache.beam.runners.core.construction.graph.PipelineValidator.validateComponents(PipelineValidator.java:112) jobserver_1| Caused by: org.apache.beam.vendor.grpc.v1p21p0.com.google.protobuf.InvalidProtocolBufferException: Protocol message had invalid UTF-8. jobserver_1| at org.apache.beam.vendor.grpc.v1p21p0.com.google.protobuf.InvalidProtocolBufferException.invalidUtf8(InvalidProtocolBufferException.java:141) jobserver_1| at org.apache.beam.vendor.grpc.v1p21p0.com.google.protobuf.Utf8$DecodeUtil.handleTwoBytes(Utf8.java:1909) jobserver_1| at org.apache.beam.vendor.grpc.v1p21p0.com.google.protobuf.Utf8$DecodeUtil.access$700(Utf8.java:1883) == Below is description about my environment. 1. Combination of versions of each containers are, apache beam: 2.16.0 sdk: python3.7_sdk: 2.19.0.dev flink: flink1.9_job_server:latest 2. Set up containers using beam repository. https://github.com/yuwtennis/beam-deployment/wiki/Setting-up-flink-runner 3. Set up docker-compose as below. https://github.com/yuwtennis/beam-deployment/blob/master/flink-session-cluster/docker-compose.yml 4. Kept source code very simple. https://github.com/yuwtennis/beam-deployment/blob/master/flink-session-cluster/samples/src/sample.py Do I need any other options in job server to properly serialize/deserialize the incoming message from beam client ? My current options are below. == command: [ "--artifacts-dir", "${ARTIFACTS_DIR}", "--flink-master-url", "${FLINK_MASTER_URL}", "--job-host", "${JOB_HOST}", "--job-port", "${JOB_PORT}" ] ========== I appreciate if I could get some help. Thanks, Yu Watanabe -- Yu Watanabe yu.w.ten...@gmail.com
Re: How to import external module inside ParDo using Apache Flink ?
I did not answer to Kyle's question. Sorry about that. > Did you try moving the imports from the process function to the top of main.py? Yes . Consequently, I moved all "imports" to at top of each files unless I needed to exclusively import inside the function. On Mon, Oct 7, 2019 at 4:49 PM Yu Watanabe wrote: > Thank you for the comment. > > I finally got this working. I would like to share my experience for people > whom are beginner with portable runner. > What I done was below items when calling functions and classes from > external package. > > 1. As Kyle said, I needed 'save_main_session' for sys path to persist > after pickling. > > 2. I needed to push all related files to worker nodes using > "extra_package" option to resolve dependency. > https://beam.apache.org/documentation/sdks/python-pipeline-dependencies/ > > 3. I needed to write import syntax in clear fashion otherwise I got below > error in task manager. > Looks like external packages is pushed in to > "/usr/local/lib/python3.5/site-packages" and requires PKG.MODULENAME format > to work it out. > > > import utils > > ... > > File "/usr/local/lib/python3.5/site-packages/modules/beam_pardo.py", > line 18, in > import utils > ImportError: No module named 'utils' > > > > Below is my import syntax for external package in main.py. Other files > also follow below syntax. > > > # > # Local application/library specific imports > # > import pkg_aif.utils as ut > from pkg_aif.beam_pardo import VerifyFn > from pkg_aif.beam_pardo import FlattenTagFilesFn > from pkg_aif.beam_states import StatefulBufferingFn > from pkg_aif.pipeline_wrapper import pipelineWrapper > from pkg_aif.frames import Frames > from pkg_aif.tag_counts import TagCounts > from pkg_aif.tags import Tags > from pkg_aif.es_credentials import EsCredentials > from pkg_aif.s3_credentials import S3Credentials > > > > Below are related information to above. > > Full options for PipelineOptions. > > > options = PipelineOptions([ > "--runner=PortableRunner", > "--environment_config={0}".format(DOCKER_REGISTRY), > "--environment_type=DOCKER", > "--experiments=beam_fn_api", > "--parallelism={0}".format(PARALLELISM), > "--job_endpoint=localhost:8099", > "--extra_package=PATH_TO_SDIST" > ]) > options.view_as(SetupOptions).save_main_session = True > > return beam.Pipeline(options=options) > > > > My setup.py is below. > > > import setuptools > > REQUIRED_PACKAGES = [ > 'apache-beam==2.15.0', > 'elasticsearch>=7.0.0,<8.0.0', > 'urllib3', > 'boto3' > ] > > setuptools.setup( >author = 'Yu Watanabe', >author_email = 'AUTHOR_EMAIL', >url = 'URL', >name = 'quality_validation', >version = '0.1', >install_requires = REQUIRED_PACKAGES, >packages = setuptools.find_packages(), > > ) > > > > Directory path to setup.py. > > > admin@ip-172-31-9-89:~/quality-validation-distribute/bin$ ls -l > total 20 > drwxr-xr-x 2 admin admin 4096 Oct 2 19:30 dist > -rw-r--r-- 1 admin admin0 Sep 5 21:21 __init__.py > -rw-r--r-- 1 admin admin 3782 Oct 3 11:02 main.py > drwxr-xr-x 3 admin admin 4096 Oct 3 15:41 pkg_aif > drwxr-xr-x 2 admin admin 4096 Oct 2 19:30 quality_validation.egg-info > -rw-r--r-- 1 admin admin 517 Oct 1 15:21 setup.py > > > > Thanks, > Yu Watanabe > > On Fri
Re: How to import external module inside ParDo using Apache Flink ?
Thank you for the comment. I finally got this working. I would like to share my experience for people whom are beginner with portable runner. What I done was below items when calling functions and classes from external package. 1. As Kyle said, I needed 'save_main_session' for sys path to persist after pickling. 2. I needed to push all related files to worker nodes using "extra_package" option to resolve dependency. https://beam.apache.org/documentation/sdks/python-pipeline-dependencies/ 3. I needed to write import syntax in clear fashion otherwise I got below error in task manager. Looks like external packages is pushed in to "/usr/local/lib/python3.5/site-packages" and requires PKG.MODULENAME format to work it out. import utils ... File "/usr/local/lib/python3.5/site-packages/modules/beam_pardo.py", line 18, in import utils ImportError: No module named 'utils' Below is my import syntax for external package in main.py. Other files also follow below syntax. # # Local application/library specific imports # import pkg_aif.utils as ut from pkg_aif.beam_pardo import VerifyFn from pkg_aif.beam_pardo import FlattenTagFilesFn from pkg_aif.beam_states import StatefulBufferingFn from pkg_aif.pipeline_wrapper import pipelineWrapper from pkg_aif.frames import Frames from pkg_aif.tag_counts import TagCounts from pkg_aif.tags import Tags from pkg_aif.es_credentials import EsCredentials from pkg_aif.s3_credentials import S3Credentials Below are related information to above. Full options for PipelineOptions. options = PipelineOptions([ "--runner=PortableRunner", "--environment_config={0}".format(DOCKER_REGISTRY), "--environment_type=DOCKER", "--experiments=beam_fn_api", "--parallelism={0}".format(PARALLELISM), "--job_endpoint=localhost:8099", "--extra_package=PATH_TO_SDIST" ]) options.view_as(SetupOptions).save_main_session = True return beam.Pipeline(options=options) My setup.py is below. import setuptools REQUIRED_PACKAGES = [ 'apache-beam==2.15.0', 'elasticsearch>=7.0.0,<8.0.0', 'urllib3', 'boto3' ] setuptools.setup( author = 'Yu Watanabe', author_email = 'AUTHOR_EMAIL', url = 'URL', name = 'quality_validation', version = '0.1', install_requires = REQUIRED_PACKAGES, packages = setuptools.find_packages(), ) Directory path to setup.py. admin@ip-172-31-9-89:~/quality-validation-distribute/bin$ ls -l total 20 drwxr-xr-x 2 admin admin 4096 Oct 2 19:30 dist -rw-r--r-- 1 admin admin0 Sep 5 21:21 __init__.py -rw-r--r-- 1 admin admin 3782 Oct 3 11:02 main.py drwxr-xr-x 3 admin admin 4096 Oct 3 15:41 pkg_aif drwxr-xr-x 2 admin admin 4096 Oct 2 19:30 quality_validation.egg-info -rw-r--r-- 1 admin admin 517 Oct 1 15:21 setup.py ==== Thanks, Yu Watanabe On Fri, Sep 27, 2019 at 3:23 AM Kyle Weaver wrote: > Did you try moving the imports from the process function to the top of > main.py? > > Kyle Weaver | Software Engineer | github.com/ibzib | kcwea...@google.com > > > On Wed, Sep 25, 2019 at 11:27 PM Yu Watanabe > wrote: > >> Hello. >> >> I would like to ask for help with resolving dependency issue for imported >> module. >> >> I have a directory structure as below and I am trying to import Frames >> class from frames.py to main.py. >> = >> quality-validation/ >> bin/setup.py >> main.py >> modules/ >> frames.py >> >> = >> >> However, when I run pipeline, I get below error at TaskMana
Re: How do you call user defined ParDo class from the pipeline in Portable Runner (Apache Flink) ?
Actually there was a good example in the latest wordcount.py in master repo. https://github.com/apache/beam/blob/master/sdks/python/apache_beam/examples/wordcount.py On Thu, Sep 26, 2019 at 12:00 PM Yu Watanabe wrote: > Thank you for the help. > > I have chosen to remove the super().__init__() . > > Thanks, > Yu > > On Thu, Sep 26, 2019 at 9:18 AM Ankur Goenka wrote: > >> super has some issues wile pickling in python3. Please refer >> https://github.com/uqfoundation/dill/issues/300 for more details. >> >> Removing reference to super in your dofn should help. >> >> On Wed, Sep 25, 2019 at 5:13 PM Yu Watanabe >> wrote: >> >>> Thank you for the reply. >>> >>> " save_main_session" did not work, however, situation had changed. >>> >>> 1. get_all_options() output. "save_main_session" set to True. >>> >>> = >>> 2019-09-26 09:04:11,586 DEBUG Pipeline Options: >>> {'wait_until_finish_duration': None, 'update': False, 'min_cpu_platform': >>> None, 'dataflow_endpoint': 'https://dataflow.googleapis.com', >>> 'environment_config': 'asia.gcr.io/creationline001/beam/python3:latest', >>> 'machine_type': None, 'enable_streaming_engine': False, 'sdk_location': >>> 'default', 'profile_memory': False, 'max_num_workers': None, >>> 'type_check_strictness': 'DEFAULT_TO_ANY', 'streaming': False, >>> 'setup_file': None, 'network': None, 'on_success_matcher': None, >>> 'requirements_cache': None, 'service_account_email': None, >>> 'environment_type': 'DOCKER', 'disk_type': None, 'labels': None, >>> 'profile_location': None, 'direct_runner_use_stacked_bundle': True, >>> 'use_public_ips': None, * 'save_main_session': True, *** >>> 'direct_num_workers': 1, 'num_workers': None, >>> 'worker_harness_container_image': None, 'template_location': None, >>> 'hdfs_port': None, 'flexrs_goal': None, 'profile_cpu': False, >>> 'transform_name_mapping': None, 'profile_sample_rate': 1.0, 'runner': >>> 'PortableRunner', 'project': None, 'dataflow_kms_key': None, >>> 'job_endpoint': 'localhost:8099', 'extra_packages': None, >>> 'environment_cache_millis': 0, 'dry_run': False, 'autoscaling_algorithm': >>> None, 'staging_location': None, 'job_name': None, 'no_auth': False, >>> 'runtime_type_check': False, 'direct_runner_bundle_repeat': 0, >>> 'subnetwork': None, 'pipeline_type_check': True, 'hdfs_user': None, >>> 'dataflow_job_file': None, 'temp_location': None, 'sdk_worker_parallelism': >>> 0, 'zone': None, 'experiments': ['beam_fn_api'], 'hdfs_host': None, >>> 'disk_size_gb': None, 'dataflow_worker_jar': None, 'requirements_file': >>> None, 'beam_plugins': None, 'pubsubRootUrl': None, 'region': None} >>> >>> >>> = >>> >>> 2. Error in Task Manager log did not change. >>> >>> == >>> File "/usr/local/lib/python3.5/site-packages/dill/_dill.py", line 474, >>> in find_class >>> return StockUnpickler.find_class(self, module, name) >>> AttributeError: Can't get attribute 'FlattenTagFilesFn' on >> 'apache_beam.runners.worker.sdk_worker_main' from >>> '/usr/local/lib/python3.5/site-packages/apache_beam/runners/worker/sdk_worker_main.py'> >>> >>> == >>> >>> 3. However, if I comment out "super().__init__()" in my code , error >>> changes. >>> >>> == >>> File >>> "/usr/local/lib/python3.5/site-packages/apache_beam/runners/worker/bundle_processor.py", >>> line 1078, in _create_pardo_operation >>> dofn_data = pickler.loads(serialized_fn) >>> File >>> "/usr/local/lib/python3.5/site-packages/apache_beam/internal/pickler.py", >>> line 265, in loads >>> return dill.loads(s) >>> File "/usr/local/lib/python3.5/site-packages/dill/_dill.py", line 317, >>> in loads >>> return load(file, ignore) >>> File "/usr/local/lib/python3.5/site-packages/dill/_dill.py", line 305, >>> in load >>> obj = pik.load() >>> File "/usr/local/lib/python3.5/site-packages/dill/_dill
Re: How do you call user defined ParDo class from the pipeline in Portable Runner (Apache Flink) ?
Thank you for the help. I have chosen to remove the super().__init__() . Thanks, Yu On Thu, Sep 26, 2019 at 9:18 AM Ankur Goenka wrote: > super has some issues wile pickling in python3. Please refer > https://github.com/uqfoundation/dill/issues/300 for more details. > > Removing reference to super in your dofn should help. > > On Wed, Sep 25, 2019 at 5:13 PM Yu Watanabe wrote: > >> Thank you for the reply. >> >> " save_main_session" did not work, however, situation had changed. >> >> 1. get_all_options() output. "save_main_session" set to True. >> >> = >> 2019-09-26 09:04:11,586 DEBUG Pipeline Options: >> {'wait_until_finish_duration': None, 'update': False, 'min_cpu_platform': >> None, 'dataflow_endpoint': 'https://dataflow.googleapis.com', >> 'environment_config': 'asia.gcr.io/creationline001/beam/python3:latest', >> 'machine_type': None, 'enable_streaming_engine': False, 'sdk_location': >> 'default', 'profile_memory': False, 'max_num_workers': None, >> 'type_check_strictness': 'DEFAULT_TO_ANY', 'streaming': False, >> 'setup_file': None, 'network': None, 'on_success_matcher': None, >> 'requirements_cache': None, 'service_account_email': None, >> 'environment_type': 'DOCKER', 'disk_type': None, 'labels': None, >> 'profile_location': None, 'direct_runner_use_stacked_bundle': True, >> 'use_public_ips': None, * 'save_main_session': True, *** >> 'direct_num_workers': 1, 'num_workers': None, >> 'worker_harness_container_image': None, 'template_location': None, >> 'hdfs_port': None, 'flexrs_goal': None, 'profile_cpu': False, >> 'transform_name_mapping': None, 'profile_sample_rate': 1.0, 'runner': >> 'PortableRunner', 'project': None, 'dataflow_kms_key': None, >> 'job_endpoint': 'localhost:8099', 'extra_packages': None, >> 'environment_cache_millis': 0, 'dry_run': False, 'autoscaling_algorithm': >> None, 'staging_location': None, 'job_name': None, 'no_auth': False, >> 'runtime_type_check': False, 'direct_runner_bundle_repeat': 0, >> 'subnetwork': None, 'pipeline_type_check': True, 'hdfs_user': None, >> 'dataflow_job_file': None, 'temp_location': None, 'sdk_worker_parallelism': >> 0, 'zone': None, 'experiments': ['beam_fn_api'], 'hdfs_host': None, >> 'disk_size_gb': None, 'dataflow_worker_jar': None, 'requirements_file': >> None, 'beam_plugins': None, 'pubsubRootUrl': None, 'region': None} >> >> >> = >> >> 2. Error in Task Manager log did not change. >> >> == >> File "/usr/local/lib/python3.5/site-packages/dill/_dill.py", line 474, >> in find_class >> return StockUnpickler.find_class(self, module, name) >> AttributeError: Can't get attribute 'FlattenTagFilesFn' on > 'apache_beam.runners.worker.sdk_worker_main' from >> '/usr/local/lib/python3.5/site-packages/apache_beam/runners/worker/sdk_worker_main.py'> >> >> == >> >> 3. However, if I comment out "super().__init__()" in my code , error >> changes. >> >> == >> File >> "/usr/local/lib/python3.5/site-packages/apache_beam/runners/worker/bundle_processor.py", >> line 1078, in _create_pardo_operation >> dofn_data = pickler.loads(serialized_fn) >> File >> "/usr/local/lib/python3.5/site-packages/apache_beam/internal/pickler.py", >> line 265, in loads >> return dill.loads(s) >> File "/usr/local/lib/python3.5/site-packages/dill/_dill.py", line 317, >> in loads >> return load(file, ignore) >> File "/usr/local/lib/python3.5/site-packages/dill/_dill.py", line 305, >> in load >> obj = pik.load() >> File "/usr/local/lib/python3.5/site-packages/dill/_dill.py", line 474, >> in find_class >> return StockUnpickler.find_class(self, module, name) >> ImportError: No module named 's3_credentials' >> == >> >> >> 4. My whole class is below. >> >> == >> class FlattenTagFilesFn(beam.DoFn): >> def __init__(self, s3Bucket, s3Creds, maxKeys=1000): >> self.s3Bucket = s3Bucket >> se
Re: How do you call user defined ParDo class from the pipeline in Portable Runner (Apache Flink) ?
yaml2['head']['operator_id'], # Post OperatorId preFrm[f],# Pre Frame Line postFrm[f], # Post Frame Line info['pre'][0][1],# Pre Last Modified Time info['post'][0][1]) # Post Last Modified Time yield (frames) tagCounts = TagCounts( self.s3Bucket, yaml1, # Pre Yaml yaml2, # Post Yaml info['pre'][0][0], # Pre S3Key info['post'][0][0], # Post S3Key info['pre'][0][1], # Pre Last Modified Time info['post'][0][1] ) # Post Last Modified Time yield beam.pvalue.TaggedOutput('counts', tagCounts) == I was using super() to define single instance of boto instance in ParDo. May I ask, is there a way to call super() in the constructor of ParDo ? Thanks, Yu On Thu, Sep 26, 2019 at 7:49 AM Kyle Weaver wrote: > You will need to set the save_main_session pipeline option to True. > > Kyle Weaver | Software Engineer | github.com/ibzib | kcwea...@google.com > > > On Wed, Sep 25, 2019 at 3:44 PM Yu Watanabe wrote: > >> Hello. >> >> I would like to ask question for ParDo . >> >> I am getting below error inside TaskManager when running code on Apache >> Flink using Portable Runner. >> = >> >> File >> "/usr/local/lib/python3.5/site-packages/apache_beam/runners/worker/bundle_processor.py", >> line 1078, in _create_pardo_operation >> dofn_data = pickler.loads(serialized_fn) >> File >> "/usr/local/lib/python3.5/site-packages/apache_beam/internal/pickler.py", >> line 265, in loads >> return dill.loads(s) >> File "/usr/local/lib/python3.5/site-packages/dill/_dill.py", line 317, >> in loads >> return load(file, ignore) >> File "/usr/local/lib/python3.5/site-packages/dill/_dill.py", line 305, >> in load >> obj = pik.load() >> File "/usr/local/lib/python3.5/site-packages/dill/_dill.py", line 474, >> in find_class >> return StockUnpickler.find_class(self, module, name) >> AttributeError: Can't get attribute 'FlattenTagFilesFn' on > 'apache_beam.runners.worker.sdk_worker_main' from >> '/usr/local/lib/python3.5/site-packages/apache_beam/runners/worker/sdk_worker_main.py'> >> = >> >> " FlattenTagFilesFn" is defined as ParDo and called from Pipeline as >> below. >> = >> frames, counts = ({'pre': pcollPre, 'post': pcollPost} >> | 'combined:cogroup' >> beam.CoGroupByKey() >> | 'combined:exclude' >> beam.Filter(lambda x: >> (len(x[1]['pre']) > 0) and (len(x[1]['post']) > 0)) >> | 'combined:flat'>> >> beam.ParDo(FlattenTagFilesFn(s3Bucket, s3Creds)) >> .with_outputs('counts', >> main='frames')) >> = >> >> In the same file I have defined the class as below. >> ========= >> class FlattenTagFilesFn(beam.DoFn): >> def __init__(self, s3Bucket, s3Creds, maxKeys=1000): >> self.s3Bucket = s3Bucket >> self.s3Creds = s3Creds >> self.maxKeys = maxKeys >> = >> >> This is not a problem when running pipeline using DirectRunner. >> May I ask , how should I import class for ParDo when running on Flink ? >> >> Thanks, >> Yu Watanabe >> >> -- >> Yu Watanabe >> Weekend Freelancer who loves to challenge building data platform >> yu.w.ten...@gmail.com >> [image: LinkedIn icon] <https://www.linkedin.com/in/yuwatanabe1> [image: >> Twitter icon] <https://twitter.com/yuwtennis> >> > -- Yu Watanabe Weekend Freelancer who loves to challenge building data platform yu.w.ten...@gmail.com [image: LinkedIn icon] <https://www.linkedin.com/in/yuwatanabe1> [image: Twitter icon] <https://twitter.com/yuwtennis>
How do you call user defined ParDo class from the pipeline in Portable Runner (Apache Flink) ?
Hello. I would like to ask question for ParDo . I am getting below error inside TaskManager when running code on Apache Flink using Portable Runner. = File "/usr/local/lib/python3.5/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1078, in _create_pardo_operation dofn_data = pickler.loads(serialized_fn) File "/usr/local/lib/python3.5/site-packages/apache_beam/internal/pickler.py", line 265, in loads return dill.loads(s) File "/usr/local/lib/python3.5/site-packages/dill/_dill.py", line 317, in loads return load(file, ignore) File "/usr/local/lib/python3.5/site-packages/dill/_dill.py", line 305, in load obj = pik.load() File "/usr/local/lib/python3.5/site-packages/dill/_dill.py", line 474, in find_class return StockUnpickler.find_class(self, module, name) AttributeError: Can't get attribute 'FlattenTagFilesFn' on = " FlattenTagFilesFn" is defined as ParDo and called from Pipeline as below. = frames, counts = ({'pre': pcollPre, 'post': pcollPost} | 'combined:cogroup' >> beam.CoGroupByKey() | 'combined:exclude' >> beam.Filter(lambda x: (len(x[1]['pre']) > 0) and (len(x[1]['post']) > 0)) | 'combined:flat'>> beam.ParDo(FlattenTagFilesFn(s3Bucket, s3Creds)) .with_outputs('counts', main='frames')) = In the same file I have defined the class as below. = class FlattenTagFilesFn(beam.DoFn): def __init__(self, s3Bucket, s3Creds, maxKeys=1000): self.s3Bucket = s3Bucket self.s3Creds = s3Creds self.maxKeys = maxKeys = This is not a problem when running pipeline using DirectRunner. May I ask , how should I import class for ParDo when running on Flink ? Thanks, Yu Watanabe -- Yu Watanabe Weekend Freelancer who loves to challenge building data platform yu.w.ten...@gmail.com [image: LinkedIn icon] <https://www.linkedin.com/in/yuwatanabe1> [image: Twitter icon] <https://twitter.com/yuwtennis>
Re: How to reference manifest from apache flink worker node ?
I needed little adjustment with the pipeline option to work it out. Pipeline option required 'job_endpoint' when using manual start up of job server using jar file. = options = PipelineOptions([ "--runner=PortableRunner", "--environment_config= asia.gcr.io/creationline001/beam/python3:latest", "--environment_type=DOCKER", "--experiments=beam_fn_api", "--job_endpoint=localhost:8099" ]) = Otherwise, runner spins up job server container. == ERROR:root:Starting job service with ['docker', 'run', '-v', '/usr/bin/docker:/bin/docker', '-v', '/var/run/docker.sock:/var/run/docker.sock', '--network=host', ' admin-docker-apache.bintray.io/beam/flink-job-server:latest', '--job-host', 'localhost', '--job-port', '42915', '--artifact-port', '56561', '--expansion-port', '53857'] ERROR:root:Error bringing up job service == Without "job_endpoint" option default job server is used. == https://github.com/apache/beam/blob/7931ec055e2da7214c82e368ef7d7fd679faaef1/sdks/python/apache_beam/runners/portability/portable_runner.py#L176 server = self.default_job_server(options) => https://github.com/apache/beam/blob/d963aeb91a63f165b5ff1ebf6add8275aec204f1/sdks/python/apache_beam/runners/portability/job_server.py#L172 "-docker-apache.bintray.io/beam/flink-job-server:latest" == Thanks, Yu On Tue, Sep 24, 2019 at 2:06 PM Ankur Goenka wrote: > Flink job server does not have artifacts-dir option yet. > We have a PR to add it https://github.com/apache/beam/pull/9648 > > However, for now you can do a few manual steps to achieve this. > > Start Job server. > > 1. Download > https://repo.maven.apache.org/maven2/org/apache/beam/beam-runners-flink-1.8-job-server/2.15.0/beam-runners-flink-1.8-job-server-2.15.0.jar > > 2. Start the job server > java -jar > /home/admin/.apache_beam/cache/beam-runners-flink-1.8-job-server-2.15.0.jar > --flink-master-url ip-172-31-12-113.ap-northeast-1.compute.internal:8081 > --artifacts-dir /home/ec2-user --job-port 8099 > > 3. Submit your pipeline > options = PipelineOptions([ > "--runner=PortableRunner", > "--environment_config= > asia.gcr.io/creationline001/beam/python3:latest", > "--environment_type=DOCKER", > "--experiments=beam_fn_api" > ]) > > On Mon, Sep 23, 2019 at 8:20 PM Yu Watanabe wrote: > >> Kyle. >> >> Thank you for the assistance. >> >> Looks like "--artifacts-dir" is not parsed. Below is the DEBUG log of the >> pipline. >> >> WARNING:root:Discarding unparseable args: ['--flink_version=1.8', >> '--flink_master_url=ip-172-31-12-113.ap-northeast-1.compute.internal:8081', >> '--artifacts-dir=/home/ec2-user'] >> ... >> WARNING:root:Downloading job server jar from >> https://repo.maven.apache.org/maven2/org/apache/beam/beam-runners-flink-1.8-job-server/2.15.0/beam-runners-flink-1.8-job-server-2.15.0.jar >> DEBUG:root:Starting job service with ['java', '-jar', >> '/home/admin/.apache_beam/cache/beam-runners-flink-1.8-job-server-2.15.0.jar', >> '--flink-master-url', >> 'ip-172-31-12-113.ap-northeast-1.compute.internal:8081', '--artifacts-dir', >> '/tmp/artifactsicq2kj8c', '--job-port', 41757, '--artifact-port', 0, >> '--expansion-port', 0] >> DEBUG:root:Waiting for jobs grpc channel to be ready at localhost:41757. >> ... >> DEBUG:root:Runner option 'job_endpoint' was already added >> DEBUG:root:Runner option 'sdk_worker_parallelism' was already added >> WARNING:root:Discarding unparseable args: ['--artifacts-dir=/home/admin'] >> ... >> >> >> My pipeline option . >> >> options = PipelineOptions([ >> "--runner=FlinkRunner", >> "--flink_version=1.8", >> >> "--flink_master_url=ip-172-31-12-113.ap-northeast-1.compute.internal:8081" >> , >> "--environment_config= >> asia.gcr.io/creationli
Re: How to reference manifest from apache flink worker node ?
Ankur. Thank you for the comment. Manual start up with job server looks more solid. Setting TMPDIR then using Flink Runner sounded bit hacky . Thanks, Yu On Tue, Sep 24, 2019 at 2:06 PM Ankur Goenka wrote: > Flink job server does not have artifacts-dir option yet. > We have a PR to add it https://github.com/apache/beam/pull/9648 > > However, for now you can do a few manual steps to achieve this. > > Start Job server. > > 1. Download > https://repo.maven.apache.org/maven2/org/apache/beam/beam-runners-flink-1.8-job-server/2.15.0/beam-runners-flink-1.8-job-server-2.15.0.jar > > 2. Start the job server > java -jar > /home/admin/.apache_beam/cache/beam-runners-flink-1.8-job-server-2.15.0.jar > --flink-master-url ip-172-31-12-113.ap-northeast-1.compute.internal:8081 > --artifacts-dir /home/ec2-user --job-port 8099 > > 3. Submit your pipeline > options = PipelineOptions([ > "--runner=PortableRunner", > "--environment_config= > asia.gcr.io/creationline001/beam/python3:latest", > "--environment_type=DOCKER", > "--experiments=beam_fn_api" > ]) > > On Mon, Sep 23, 2019 at 8:20 PM Yu Watanabe wrote: > >> Kyle. >> >> Thank you for the assistance. >> >> Looks like "--artifacts-dir" is not parsed. Below is the DEBUG log of the >> pipline. >> >> WARNING:root:Discarding unparseable args: ['--flink_version=1.8', >> '--flink_master_url=ip-172-31-12-113.ap-northeast-1.compute.internal:8081', >> '--artifacts-dir=/home/ec2-user'] >> ... >> WARNING:root:Downloading job server jar from >> https://repo.maven.apache.org/maven2/org/apache/beam/beam-runners-flink-1.8-job-server/2.15.0/beam-runners-flink-1.8-job-server-2.15.0.jar >> DEBUG:root:Starting job service with ['java', '-jar', >> '/home/admin/.apache_beam/cache/beam-runners-flink-1.8-job-server-2.15.0.jar', >> '--flink-master-url', >> 'ip-172-31-12-113.ap-northeast-1.compute.internal:8081', '--artifacts-dir', >> '/tmp/artifactsicq2kj8c', '--job-port', 41757, '--artifact-port', 0, >> '--expansion-port', 0] >> DEBUG:root:Waiting for jobs grpc channel to be ready at localhost:41757. >> ... >> DEBUG:root:Runner option 'job_endpoint' was already added >> DEBUG:root:Runner option 'sdk_worker_parallelism' was already added >> WARNING:root:Discarding unparseable args: ['--artifacts-dir=/home/admin'] >> ... >> >> >> My pipeline option . >> >> options = PipelineOptions([ >> "--runner=FlinkRunner", >> "--flink_version=1.8", >> >> "--flink_master_url=ip-172-31-12-113.ap-northeast-1.compute.internal:8081" >> , >> "--environment_config= >> asia.gcr.io/creationline001/beam/python3:latest", >> "--environment_type=DOCKER", >> "--experiments=beam_fn_api", >> "--artifacts-dir=/home/admin" >> ]) >> >> >> Tracing from the log , looks like artifacts dir respects the default >> tempdir of OS. >> Thus, to adjust it I will need environment variable instead. I used >> 'TMPDIR' in my case. >> >> >> https://github.com/apache/beam/blob/7931ec055e2da7214c82e368ef7d7fd679faaef1/sdks/python/apache_beam/runners/portability/job_server.py#L203 >> artifacts_dir = self.local_temp_dir(prefix='artifacts') >> >> => >> >> https://github.com/apache/beam/blob/7931ec055e2da7214c82e368ef7d7fd679faaef1/sdks/python/apache_beam/runners/portability/job_server.py#L157 >> return tempfile.mkdtemp(dir=self._local_temp_root, **kwargs) >> >> => >> >> https://github.com/apache/beam/blob/7931ec055e2da7214c82e368ef7d7fd679faaef1/sdks/python/apache_beam/runners/portability/job_server.py#L102 >> self._local_temp_root = None >> >> >> Then it worked. >> >> (python-bm-2150) admin@ip-172-31-9-89:~$ env | grep TMPDIR >> TMPDIR=/home/admin >> >> => >> DEBUG:root:Sta
Re: How to reference manifest from apache flink worker node ?
Kyle. Thank you for the assistance. Looks like "--artifacts-dir" is not parsed. Below is the DEBUG log of the pipline. WARNING:root:Discarding unparseable args: ['--flink_version=1.8', '--flink_master_url=ip-172-31-12-113.ap-northeast-1.compute.internal:8081', '--artifacts-dir=/home/ec2-user'] ... WARNING:root:Downloading job server jar from https://repo.maven.apache.org/maven2/org/apache/beam/beam-runners-flink-1.8-job-server/2.15.0/beam-runners-flink-1.8-job-server-2.15.0.jar DEBUG:root:Starting job service with ['java', '-jar', '/home/admin/.apache_beam/cache/beam-runners-flink-1.8-job-server-2.15.0.jar', '--flink-master-url', 'ip-172-31-12-113.ap-northeast-1.compute.internal:8081', '--artifacts-dir', '/tmp/artifactsicq2kj8c', '--job-port', 41757, '--artifact-port', 0, '--expansion-port', 0] DEBUG:root:Waiting for jobs grpc channel to be ready at localhost:41757. ... DEBUG:root:Runner option 'job_endpoint' was already added DEBUG:root:Runner option 'sdk_worker_parallelism' was already added WARNING:root:Discarding unparseable args: ['--artifacts-dir=/home/admin'] ... My pipeline option . options = PipelineOptions([ "--runner=FlinkRunner", "--flink_version=1.8", "--flink_master_url=ip-172-31-12-113.ap-northeast-1.compute.internal:8081" , "--environment_config= asia.gcr.io/creationline001/beam/python3:latest", "--environment_type=DOCKER", "--experiments=beam_fn_api", "--artifacts-dir=/home/admin" ]) Tracing from the log , looks like artifacts dir respects the default tempdir of OS. Thus, to adjust it I will need environment variable instead. I used 'TMPDIR' in my case. https://github.com/apache/beam/blob/7931ec055e2da7214c82e368ef7d7fd679faaef1/sdks/python/apache_beam/runners/portability/job_server.py#L203 artifacts_dir = self.local_temp_dir(prefix='artifacts') => https://github.com/apache/beam/blob/7931ec055e2da7214c82e368ef7d7fd679faaef1/sdks/python/apache_beam/runners/portability/job_server.py#L157 return tempfile.mkdtemp(dir=self._local_temp_root, **kwargs) => https://github.com/apache/beam/blob/7931ec055e2da7214c82e368ef7d7fd679faaef1/sdks/python/apache_beam/runners/portability/job_server.py#L102 self._local_temp_root = None Then it worked. (python-bm-2150) admin@ip-172-31-9-89:~$ env | grep TMPDIR TMPDIR=/home/admin => DEBUG:root:Starting job service with ['java', '-jar', '/home/admin/.apache_beam/cache/beam-runners-flink-1.8-job-server-2.15.0.jar', '--flink-master-url', 'ip-172-31-12-113.ap-northeast-1.compute.internal:8081', '--artifacts-dir', '/home/admin/artifacts18unmki3', '--job-port', 46767, '--artifact-port', 0, '--expansion-port', 0] I will use nfs server for now to share the artifact directory with worker nodes. Thanks. Yu Watanabe On Tue, Sep 24, 2019 at 9:50 AM Kyle Weaver wrote: > The relevant configuration flag for the job server is `--artifacts-dir`. > > @Robert Bradshaw I added this info to the log > message: https://github.com/apache/beam/pull/9646 > > Kyle Weaver | Software Engineer | github.com/ibzib | kcwea...@google.com > > > On Mon, Sep 23, 2019 at 11:36 AM Robert Bradshaw > wrote: > >> You need to set your artifact directory to point to a >> distributed filesystem that's also accessible to the workers (when starting >> up the job server). >> >> On Mon, Sep 23, 2019 at 5:43 AM Yu Watanabe >> wrote: >> >>> Hello. >>> >>> I am working on flink runner (2.15.0) and would like to ask question >>> about how to solve my error. >>> >>> Currently , I have a remote cluster deployed as below . (please see >>> slide1) >>> All master and worker nodes are installed on different server from >>> apache beam. >>> >>> >>> https://drive.google.com/file/d/1vBULp6kiEfQNGVV3Nl2mMKAZZKYsb11h/view?usp=sharing >>> >>> When I run beam pipeline, harness container tries to start up, however, >>> fails immediately with below error on docker side. >>> >>> = >>> Sep 23 21:04:05 ip
How to reference manifest from apache flink worker node ?
Hello. I am working on flink runner (2.15.0) and would like to ask question about how to solve my error. Currently , I have a remote cluster deployed as below . (please see slide1) All master and worker nodes are installed on different server from apache beam. https://drive.google.com/file/d/1vBULp6kiEfQNGVV3Nl2mMKAZZKYsb11h/view?usp=sharing When I run beam pipeline, harness container tries to start up, however, fails immediately with below error on docker side. = Sep 23 21:04:05 ip-172-31-0-143 51106514ffc0[7920]: 2019/09/23 12:04:05 Initializing python harness: /opt/apache/beam/boot --id=1 --logging_endpoint=localhost:34227 --artifact_endpoint=localhost:45303 --provision_endpoint=localhost:44585 --control_endpoint=localhost:43869 Sep 23 21:04:05 ip-172-31-0-143 dockerd: time="2019-09-23T21:04:05.380942292+09:00" level=debug msg=event module=libcontainerd namespace=moby topic=/tasks/start Sep 23 21:04:05 ip-172-31-0-143 51106514ffc0[7920]: 2019/09/23 12:04:05 Failed to retrieve staged files: failed to get manifest Sep 23 21:04:05 ip-172-31-0-143 51106514ffc0[7920]: #011caused by: Sep 23 21:04:05 ip-172-31-0-143 51106514ffc0[7920]: rpc error: code = Unknown desc = = At the same time, task manager logs below error. = 2019-09-23 21:04:05,525 INFO org.apache.beam.runners.fnexecution.artifact.BeamFileSystemArtifactRetrievalService - GetManifest for /tmp/artifactsfkyik3us/job_de145881-9ea7-4e44-8e6d-31a6ea298010/MANIFEST 2019-09-23 21:04:05,526 INFO org.apache.beam.runners.fnexecution.artifact.BeamFileSystemArtifactRetrievalService - Loading manifest for retrieval token /tmp/artifactsfkyik3us/job_de145881-9ea7-4e44-8e6d-31a6ea298010/MANIFEST 2019-09-23 21:04:05,531 INFO org.apache.beam.runners.fnexecution.artifact.BeamFileSystemArtifactRetrievalService - GetManifest for /tmp/artifactsfkyik3us/job_de145881-9ea7-4e44-8e6d-31a6ea298010/MANIFEST failed java.util.concurrent.ExecutionException: java.io.FileNotFoundException: /tmp/artifactsfkyik3us/job_de145881-9ea7-4e44-8e6d-31a6ea298010/MANIFEST (No such file or directory) at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.AbstractFuture.getDoneValue(AbstractFuture.java:531) ... = I see this artifact directory on the server where beam pipeline is executed but not on worker node. = # Beam server (python-bm-2150) admin@ip-172-31-9-89:~$ sudo ls -ld /tmp/artifactsfkyik3us drwx-- 3 admin admin 4096 Sep 23 12:03 /tmp/artifactsfkyik3us # Flink worker node [ec2-user@ip-172-31-0-143 flink]$ sudo ls -ld /tmp/artifactsfkyik3us ls: cannot access /tmp/artifactsfkyik3us: No such file or directory = >From the error, it seems that container is not starting up correctly due to manifest file is missing. What would be a good approach to reference artifact directory from worker node? I appreciate if I could get some advice . Best Regards, Yu Watanabe -- Yu Watanabe Weekend Freelancer who loves to challenge building data platform yu.w.ten...@gmail.com [image: LinkedIn icon] <https://www.linkedin.com/in/yuwatanabe1> [image: Twitter icon] <https://twitter.com/yuwtennis>
Re: Submitting job to AWS EMR fails with error=2, No such file or directory
Thank you for the reply. > Why not manually docker pull the image (remember to adjust the container location to omit to registry) locally first? Actually, when using docker, I have already pulled the image from remote repository (GCR). Is there a way to make Flink Runner to not call "docker pull" if the container is already pulled ? Thanks, Yu Watanabe On Fri, Sep 20, 2019 at 7:48 PM Benjamin Tan wrote: > Seems like some file is missing. Why not manually docker pull the image > (remember to adjust the container location to omit to registry) locally > first? At least you can eliminate another source of trouble. > > Also, depending on your pipeline, if you are running distributed, you must > make sure your files are accessible. For example, local paths won’t work at > all. > > So maybe you can force a single worker first too. > > Sent from my iPhone > > On 20 Sep 2019, at 5:11 PM, Yu Watanabe wrote: > > Ankur > > Thank you for the advice. > You're right. Looking at the task manager's log, looks like first "docker > pull" fails from yarn user and then couple of errors comes after. > As a result, "docker run" seems to fail. > I have been working on whole week and still not manage through from yarn > session to get authenticated against Google Container Registry... > > > == > 2019-09-19 06:47:38,196 INFO org.apache.flink.runtime.taskmanager.Task > - MapPartition (MapPartition at [3]{Create, > ParDo(EsOutputFn)}) (1/1) (d2f0d79e4614c3b0cb5a8cbd38de37da) switched from > DEPLOYING to RUNNING. > 2019-09-19 06:47:41,181 WARN > org.apache.beam.runners.fnexecution.environment.DockerCommand - Unable to > pull docker image asia.gcr.io/PROJECTNAME/beam/python3:latest, cause: > Received exit code 1 for command 'docker pull > asia.gcr.io/creationline001/beam/python3:latest'. stderr: Error response > from daemon: unauthorized: You don't have the needed permissions to perform > this operation, and you may have invalid credentials. To authenticate your > request, follow the steps in: > https://cloud.google.com/container-registry/docs/advanced-authentication > 2019-09-19 06:47:44,035 INFO > > org.apache.beam.runners.fnexecution.artifact.BeamFileSystemArtifactRetrievalService > - GetManifest for > /tmp/artifactsknnvmjj8/job_fc5fff58-4408-4e0d-833b-675215218234/MANIFEST > 2019-09-19 06:47:44,037 INFO > > org.apache.beam.runners.fnexecution.artifact.BeamFileSystemArtifactRetrievalService > - Loading manifest for retrieval token > /tmp/artifactsknnvmjj8/job_fc5fff58-4408-4e0d-833b-675215218234/MANIFEST > 2019-09-19 06:47:44,046 INFO > > org.apache.beam.runners.fnexecution.artifact.BeamFileSystemArtifactRetrievalService > - GetManifest for > /tmp/artifactsknnvmjj8/job_fc5fff58-4408-4e0d-833b-675215218234/MANIFEST > failed > java.util.concurrent.ExecutionException: java.io.FileNotFoundException: > /tmp/artifactsknnvmjj8/job_fc5fff58-4408-4e0d-833b-675215218234/MANIFEST > (No such file or directory) > at > org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.AbstractFuture.getDoneValue(AbstractFuture.java:531) > at > org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.AbstractFuture.get(AbstractFuture.java:492) > ... > Caused by: java.io.FileNotFoundException: > /tmp/artifactsknnvmjj8/job_fc5fff58-4408-4e0d-833b-675215218234/MANIFEST > (No such file or directory) > at java.io.FileInputStream.open0(Native Method) > at java.io.FileInputStream.open(FileInputStream.java:195) > at java.io.FileInputStream.(FileInputStream.java:138) > at > org.apache.beam.sdk.io.LocalFileSystem.open(LocalFileSystem.java:114) > ... > 2019-09-19 06:48:43,952 ERROR org.apache.flink.runtime.operators.BatchTask > - Error in task code: MapPartition (MapPartition at > [3]{Create, ParDo(EsOutputFn)}) (1/1) > java.lang.Exception: The user defined 'open()' method caused an exception: > java.io.IOException: Received exit code 1 for command 'docker inspect -f > {{.State.Running}} > 7afbdcfd241629d24872ba1c74ef10f3d07c854c9cc675a65d4d16b9fdbde752'. stderr: > Error: No such object: > 7afbdcfd241629d24872ba1c74ef10f3d07c854c9cc675a65d4d16b9fdbde752 > at > org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:498) > at > org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711) > at java.lang.Thread.run(Thread.java:748) > ... > >
Re: Submitting job to AWS EMR fails with error=2, No such file or directory
=-- -rwxrwxrwx 1 yarn ec2-user 16543786 Sep 19 04:39 boot ===-- Looks like the PROCESS command was blocked because of this error. ===-- java.util.concurrent.ExecutionException: java.io.FileNotFoundException: /tmp/artifactskjsmgv8p/job_ea4673de-71d0-4dd9-b683-fe8c52464666/MANIFEST (No such file or directory) =======-- Thanks, Yu Watanabe On Wed, Sep 18, 2019 at 11:37 PM Benjamin Tan wrote: > Try this as part of PipelineOptions: > > --environment_config={\"command\":\"/opt/apache/beam/boot\"} > > On 2019/09/18 10:40:42, Yu Watanabe wrote: > > Hello. > > > > I am trying to run FlinkRunner (2.15.0) on AWS EC2 instance and submit > job > > to AWS EMR (5.26.0). > > > > However, I get below error when I run the pipeline and fail. > > > > - > > Caused by: java.lang.Exception: The user defined 'open()' method caused > an > > exception: java.io.IOException: Cannot run program "docker": error=2, No > > such file or directory > > at > > org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:498) > > at > > org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368) > > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711) > > ... 1 more > > Caused by: > > > org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.UncheckedExecutionException: > > java.io.IOException: Cannot run program "docker": error=2, No such file > or > > directory > > at > > > org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$LocalLoadingCache.getUnchecked(LocalCache.java:4966) > > at > > > org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$SimpleStageBundleFactory.(DefaultJobBundleFactory.java:211) > > at > > > org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$SimpleStageBundleFactory.(DefaultJobBundleFactory.java:202) > > at > > > org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory.forStage(DefaultJobBundleFactory.java:185) > > at > > > org.apache.beam.runners.flink.translation.functions.FlinkDefaultExecutableStageContext.getStageBundleFactory(FlinkDefaultExecutableStageContext.java:49) > > at > > > org.apache.beam.runners.flink.translation.functions.ReferenceCountingFlinkExecutableStageContextFactory$WrappedContext.getStageBundleFactory(ReferenceCountingFlinkExecutableStageContextFactory.java:203) > > at > > > org.apache.beam.runners.flink.translation.functions.FlinkExecutableStageFunction.open(FlinkExecutableStageFunction.java:129) > > at > > > org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36) > > at > > org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:494) > > ... 3 more > > Caused by: java.io.IOException: Cannot run program "docker": error=2, No > > such file or directory > > at java.lang.ProcessBuilder.start(ProcessBuilder.java:1048) > > at > > > org.apache.beam.runners.fnexecution.environment.DockerCommand.runShortCommand(DockerCommand.java:141) > > at > > > org.apache.beam.runners.fnexecution.environment.DockerCommand.runImage(DockerCommand.java:92) > > at > > > org.apache.beam.runners.fnexecution.environment.DockerEnvironmentFactory.createEnvironment(DockerEnvironmentFactory.java:152) > > at > > > org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$1.load(DefaultJobBundleFactory.java:178) > > at > > > org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$1.load(DefaultJobBundleFactory.java:162) > > at > > > org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3528) > > at > > > org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2277) > > at > > > org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2154) > > at > > > org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2044) > > at > > > org.apach
apache beam 2.16.0 ?
Hello. I would like to use 2.16.0 to diagnose container problem, however, looks like the job-server is not available on maven yet. RuntimeError: Unable to fetch remote job server jar at https://repo.maven.apache.org/maven2/org/apache/beam/beam-runners-flink-1.8-job-server/2.16.0/beam-runners-flink-1.8-job-server-2.16.0.jar: HTTP Error 404: Not Found Checked maven repo and indeed there is no job-server 2.16.0 yet. https://mvnrepository.com/artifact/org.apache.beam/beam-runners-flink Will 2.16.0 released soon ? Thanks, Yu Watanabe -- Yu Watanabe Weekend Freelancer who loves to challenge building data platform yu.w.ten...@gmail.com [image: LinkedIn icon] <https://www.linkedin.com/in/yuwatanabe1> [image: Twitter icon] <https://twitter.com/yuwtennis>
Re: Submitting job to AWS EMR fails with error=2, No such file or directory
Thank you for the reply. I see files "boot" under below directories. But these seems to be used for containers. (python) admin@ip-172-31-9-89:~/beam-release-2.15.0$ find ./ -name "boot" -exec ls -l {} \; lrwxrwxrwx 1 admin admin 23 Sep 16 23:43 ./sdks/python/container/.gogradle/project_gopath/src/ github.com/apache/beam/sdks/python/boot -> ../../../../../../../.. -rwxr-xr-x 1 admin admin 16543786 Sep 16 23:48 ./sdks/python/container/build/target/launcher/linux_amd64/boot -rwxr-xr-x 1 admin admin 16358928 Sep 16 23:48 ./sdks/python/container/build/target/launcher/darwin_amd64/boot -rwxr-xr-x 1 admin admin 16543786 Sep 16 23:48 ./sdks/python/container/py3/build/docker/target/linux_amd64/boot -rwxr-xr-x 1 admin admin 16358928 Sep 16 23:48 ./sdks/python/container/py3/build/docker/target/darwin_amd64/boot -rwxr-xr-x 1 admin admin 16543786 Sep 16 23:48 ./sdks/python/container/py3/build/target/linux_amd64/boot -rwxr-xr-x 1 admin admin 16358928 Sep 16 23:48 ./sdks/python/container/py3/build/target/darwin_amd64/boot On Wed, Sep 18, 2019 at 11:37 PM Benjamin Tan wrote: > Try this as part of PipelineOptions: > > --environment_config={\"command\":\"/opt/apache/beam/boot\"} > > On 2019/09/18 10:40:42, Yu Watanabe wrote: > > Hello. > > > > I am trying to run FlinkRunner (2.15.0) on AWS EC2 instance and submit > job > > to AWS EMR (5.26.0). > > > > However, I get below error when I run the pipeline and fail. > > > > - > > Caused by: java.lang.Exception: The user defined 'open()' method caused > an > > exception: java.io.IOException: Cannot run program "docker": error=2, No > > such file or directory > > at > > org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:498) > > at > > org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368) > > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711) > > ... 1 more > > Caused by: > > > org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.UncheckedExecutionException: > > java.io.IOException: Cannot run program "docker": error=2, No such file > or > > directory > > at > > > org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$LocalLoadingCache.getUnchecked(LocalCache.java:4966) > > at > > > org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$SimpleStageBundleFactory.(DefaultJobBundleFactory.java:211) > > at > > > org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$SimpleStageBundleFactory.(DefaultJobBundleFactory.java:202) > > at > > > org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory.forStage(DefaultJobBundleFactory.java:185) > > at > > > org.apache.beam.runners.flink.translation.functions.FlinkDefaultExecutableStageContext.getStageBundleFactory(FlinkDefaultExecutableStageContext.java:49) > > at > > > org.apache.beam.runners.flink.translation.functions.ReferenceCountingFlinkExecutableStageContextFactory$WrappedContext.getStageBundleFactory(ReferenceCountingFlinkExecutableStageContextFactory.java:203) > > at > > > org.apache.beam.runners.flink.translation.functions.FlinkExecutableStageFunction.open(FlinkExecutableStageFunction.java:129) > > at > > > org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36) > > at > > org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:494) > > ... 3 more > > Caused by: java.io.IOException: Cannot run program "docker": error=2, No > > such file or directory > > at java.lang.ProcessBuilder.start(ProcessBuilder.java:1048) > > at > > > org.apache.beam.runners.fnexecution.environment.DockerCommand.runShortCommand(DockerCommand.java:141) > > at > > > org.apache.beam.runners.fnexecution.environment.DockerCommand.runImage(DockerCommand.java:92) > > at > > > org.apache.beam.runners.fnexecution.environment.DockerEnvironmentFactory.createEnvironment(DockerEnvironmentFactory.java:152) > > at > > > org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$1.load(DefaultJobBundleFactory.java:178) > > at > > > org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$1.load(DefaultJobBundleFactory.java:162) > > at > > > org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3528) > > at > >
Re: Submitting job to AWS EMR fails with error=2, No such file or directory
t kill container: 3e7e0d2d9d8362995d4b566ce1611834d56c5ca550ae89ae698a279271e4c33b: No such container: 3e7e0d2d9d8362995d4b566ce1611834d56c5ca550ae89ae698a279271e4c33b at org.apache.beam.runners.fnexecution.environment.DockerCommand.runShortCommand(DockerCommand.java:179) at org.apache.beam.runners.fnexecution.environment.DockerCommand.killContainer(DockerCommand.java:134) at org.apache.beam.runners.fnexecution.environment.DockerEnvironmentFactory.createEnvironment(DockerEnvironmentFactory.java:174) ... 21 more = Next , I'd like to try using environment type 'PROCESS' but it seems you need external command for 'environment_config' https://github.com/apache/beam/blob/7931ec055e2da7214c82e368ef7d7fd679faaef1/sdks/python/apache_beam/runners/portability/portable_runner.py#L138 config = json.loads(portable_options.environment_config) May I ask what command I need to set as shell script for each task managers ? Best Regards, Yu Watanabe On Wed, Sep 18, 2019 at 9:39 PM Benjamin Tan wrote: > Seems like docker is not installed. Maybe run with PROCESS as the > environment type ? Or install docker. > > Sent from my iPhone > > On 18 Sep 2019, at 6:40 PM, Yu Watanabe wrote: > > Hello. > > I am trying to run FlinkRunner (2.15.0) on AWS EC2 instance and submit job > to AWS EMR (5.26.0). > > However, I get below error when I run the pipeline and fail. > > - > Caused by: java.lang.Exception: The user defined 'open()' method caused an > exception: java.io.IOException: Cannot run program "docker": error=2, No > such file or directory > at > org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:498) > at > org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711) > ... 1 more > Caused by: > org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.UncheckedExecutionException: > java.io.IOException: Cannot run program "docker": error=2, No such file or > directory > at > org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$LocalLoadingCache.getUnchecked(LocalCache.java:4966) > at > org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$SimpleStageBundleFactory.(DefaultJobBundleFactory.java:211) > at > org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$SimpleStageBundleFactory.(DefaultJobBundleFactory.java:202) > at > org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory.forStage(DefaultJobBundleFactory.java:185) > at > org.apache.beam.runners.flink.translation.functions.FlinkDefaultExecutableStageContext.getStageBundleFactory(FlinkDefaultExecutableStageContext.java:49) > at > org.apache.beam.runners.flink.translation.functions.ReferenceCountingFlinkExecutableStageContextFactory$WrappedContext.getStageBundleFactory(ReferenceCountingFlinkExecutableStageContextFactory.java:203) > at > org.apache.beam.runners.flink.translation.functions.FlinkExecutableStageFunction.open(FlinkExecutableStageFunction.java:129) > at > org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36) > at > org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:494) > ... 3 more > Caused by: java.io.IOException: Cannot run program "docker": error=2, No > such file or directory > at java.lang.ProcessBuilder.start(ProcessBuilder.java:1048) > at > org.apache.beam.runners.fnexecution.environment.DockerCommand.runShortCommand(DockerCommand.java:141) > at > org.apache.beam.runners.fnexecution.environment.DockerCommand.runImage(DockerCommand.java:92) > at > org.apache.beam.runners.fnexecution.environment.DockerEnvironmentFactory.createEnvironment(DockerEnvironmentFactory.java:152) > at > org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$1.load(DefaultJobBundleFactory.java:178) > at > org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$1.load(DefaultJobBundleFactory.java:162) > at > org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3528) > at > org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2277) > at > org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2154) &g
Submitting job to AWS EMR fails with error=2, No such file or directory
=- (python) [ec2-user@ip-172-31-2-121 ~]$ docker info Containers: 0 Running: 0 Paused: 0 Stopped: 0 ... - I used "debian-stretch" . - debian-stretch-hvm-x86_64-gp2-2019-09-08-17994-572488bb-fc09-4638-8628-e1e1d26436f4-ami-0ed2d2283aa1466df.4 (ami-06f16171199d98c63) - This seems to not happen when flink runs locally. - admin@ip-172-31-9-89:/opt/flink$ sudo ss -atunp | grep 8081 tcpLISTEN 0 128 :::8081 :::* users:(("java",pid=18420,fd=82)) admin@ip-172-31-9-89:/opt/flink$ sudo ps -ef | grep java | head -1 admin17698 1 0 08:59 ?00:00:12 java -jar /home/admin/.apache_beam/cache/beam-runners-flink-1.8-job-server-2.15.0.jar --flink-master-url ip-172-31-1-84.ap-northeast-1.compute.internal:43581 --artifacts-dir /tmp/artifactskj47j8yn --job-port 48205 --artifact-port 0 --expansion-port 0 admin@ip-172-31-9-89:/opt/flink$ ========- Would there be any other setting I need to look for when running on EC2 instance ? Thanks, Yu Watanabe -- Yu Watanabe Weekend Freelancer who loves to challenge building data platform yu.w.ten...@gmail.com [image: LinkedIn icon] <https://www.linkedin.com/in/yuwatanabe1> [image: Twitter icon] <https://twitter.com/yuwtennis>
Re: How to use google container registry for FlinkRunner ?
Kyle, Benjamin Thank you for the quick reply. And yes, adding "environment_config" to Pipeline options did the trick. === options = PipelineOptions([ "--runner=FlinkRunner", "--flink_version=1.8", "--flink_master_url=localhost:8081", "--environment_config= asia.gcr.io/PROJECTNAME/beam/python3", "--experiments=beam_fn_api" ]) p = beam.Pipeline(options=options) ======= Thanks, Yu Watanabe On Tue, Sep 17, 2019 at 11:21 AM Benjamin Tan wrote: > Something like this: > > options = PipelineOptions(["--runner=PortableRunner", >"--environment_config=apachebeam/python3.7_sdk > ", # <--- >"--job_endpoint=dnnserver2:8099"]) > > On 2019/09/17 02:14:00, Kyle Weaver wrote: > > I think environment_config is the option you are looking for. > > > > On Mon, Sep 16, 2019 at 7:06 PM Yu Watanabe > wrote: > > > > > Hello. > > > > > > For testing I would like to use image uploaded to google container > > > registry. > > > How can I use images pulled from other than bintray.io ? > > > > > > > > > > > > > admin@ip-172-31-9-89:~$ gcloud container images list --repository > > > asia.gcr.io/[PROJECTNAME] <http://asia.gcr.io/%5BPROJECTNAME%5D> < > http://asia.gcr.io/%5BPROJECTNAME%5D> > > > NAME > > > asia.gcr.io/[PROJECTNAME]/beam > <http://asia.gcr.io/%5BPROJECTNAME%5D/beam> < > http://asia.gcr.io/%5BPROJECTNAME%5D/beam> > > > > > > > > > > > > > Looks like FlinkRunner (2.15.0) uses bintray repository as a default > > > behavior. As a result I am not able to use > > > > > > Caused by: java.lang.Exception: The user defined 'open()' method > caused an > > > exception: java.io.IOException: Received exit code 125 for command > 'docker > > > run -d --network=host --env=DOCKER_MAC_CONTAINER=null --rm > > > ywatanabe-docker-apache.bintray.io/beam/python3:latest --id=1 > > > --logging_endpoint=localhost:33787 --artifact_endpoint=localhost:41681 > > > --provision_endpoint=localhost:45089 > --control_endpoint=localhost:37045'. > > > stderr: Unable to find image ' > > > ywatanabe-docker-apache.bintray.io/beam/python3:latest' locallydocker: > > > Error response from daemon: manifest for > > > ywatanabe-docker-apache.bintray.io/beam/python3:latest not found: > > > manifest unknown: The named manifest is not known to the registry.See > > > 'docker run -- > > > > > > According to online doc, there is not parameter which controls which > image > > > to use . > > > > > > https://beam.apache.org/documentation/runners/flink/ > > > > > > Pipeline options I am using is below. > > > > > > > > > > > > > options = PipelineOptions([ > > > "--runner=FlinkRunner", > > > "--flink_version=1.8", > > > "--flink_master_url=localhost:8081", > > > "--worker_harness_container_image= > > > asia.gcr.io/PROJECTNAME/beam/python3", > > > "--experiments=beam_fn_api" > > > ]) > > > > > > > > > > > > > Perhaps is there any environment variable to specify which image to > use ? > > > > > > Best Regards, > > > Yu Watanabe > > > > > > -- > > > Yu Watanabe > > > Weekend Freelancer who loves to challenge building data platform > > > yu.w.ten...@gmail.com > > > [image: LinkedIn icon] <https://www.linkedin.com/in/yuwatanabe1> > [image: > > > Twitter icon] <https://twitter.com/yuwtennis> > > > > > -- > > Kyle Weaver | Software Engineer | github.com/ibzib | kcwea...@google.com > > > -- Yu Watanabe Weekend Freelancer who loves to challenge building data platform yu.w.ten...@gmail.com [image: LinkedIn icon] <https://www.linkedin.com/in/yuwatanabe1> [image: Twitter icon] <https://twitter.com/yuwtennis>
How to use google container registry for FlinkRunner ?
Hello. For testing I would like to use image uploaded to google container registry. How can I use images pulled from other than bintray.io ? admin@ip-172-31-9-89:~$ gcloud container images list --repository asia.gcr.io/[PROJECTNAME] <http://asia.gcr.io/%5BPROJECTNAME%5D> NAME asia.gcr.io/[PROJECTNAME]/beam <http://asia.gcr.io/%5BPROJECTNAME%5D/beam> Looks like FlinkRunner (2.15.0) uses bintray repository as a default behavior. As a result I am not able to use Caused by: java.lang.Exception: The user defined 'open()' method caused an exception: java.io.IOException: Received exit code 125 for command 'docker run -d --network=host --env=DOCKER_MAC_CONTAINER=null --rm ywatanabe-docker-apache.bintray.io/beam/python3:latest --id=1 --logging_endpoint=localhost:33787 --artifact_endpoint=localhost:41681 --provision_endpoint=localhost:45089 --control_endpoint=localhost:37045'. stderr: Unable to find image ' ywatanabe-docker-apache.bintray.io/beam/python3:latest' locallydocker: Error response from daemon: manifest for ywatanabe-docker-apache.bintray.io/beam/python3:latest not found: manifest unknown: The named manifest is not known to the registry.See 'docker run -- According to online doc, there is not parameter which controls which image to use . https://beam.apache.org/documentation/runners/flink/ Pipeline options I am using is below. options = PipelineOptions([ "--runner=FlinkRunner", "--flink_version=1.8", "--flink_master_url=localhost:8081", "--worker_harness_container_image= asia.gcr.io/PROJECTNAME/beam/python3", "--experiments=beam_fn_api" ]) Perhaps is there any environment variable to specify which image to use ? Best Regards, Yu Watanabe -- Yu Watanabe Weekend Freelancer who loves to challenge building data platform yu.w.ten...@gmail.com [image: LinkedIn icon] <https://www.linkedin.com/in/yuwatanabe1> [image: Twitter icon] <https://twitter.com/yuwtennis>
Re: Flink Runner logging FAILED_TO_UNCOMPRESS
Kyle Thank you for the assistance. By specifying "experiments" in PipelineOptions , == options = PipelineOptions([ "--runner=FlinkRunner", "--flink_version=1.8", "--flink_master_url=localhost:8081", "--experiments=beam_fn_api" ]) == I was able to submit the job successfully. [grpc-default-executor-0] INFO org.apache.beam.runners.flink.FlinkJobInvoker - Invoking job BeamApp-ywatanabe-0915024400-8e0dc08_bc24de73-c729-41ad-ae27-35281b45feb9 [grpc-default-executor-0] INFO org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation - Starting job invocation BeamApp-ywatanabe-0915024400-8e0dc08_bc24de73-c729-41ad-ae27-35281b45feb9 [flink-runner-job-invoker] INFO org.apache.beam.runners.flink.FlinkPipelineRunner - Translating pipeline to Flink program. [flink-runner-job-invoker] INFO org.apache.beam.runners.flink.FlinkExecutionEnvironments - Creating a Batch Execution Environment. [flink-runner-job-invoker] INFO org.apache.beam.runners.flink.FlinkExecutionEnvironments - Using Flink Master URL localhost:8081. [flink-runner-job-invoker] WARN org.apache.beam.runners.flink.FlinkExecutionEnvironments - No default parallelism could be found. Defaulting to parallelism 1. Please set an explicit parallelism with --parallelism [flink-runner-job-invoker] INFO org.apache.flink.api.java.ExecutionEnvironment - The job has 0 registered types and 0 default Kryo serializers [flink-runner-job-invoker] INFO org.apache.flink.configuration.Configuration - Config uses fallback configuration key 'jobmanager.rpc.address' instead of key 'rest.address' [flink-runner-job-invoker] INFO org.apache.flink.runtime.rest.RestClient - Rest client endpoint started. [flink-runner-job-invoker] INFO org.apache.flink.client.program.rest.RestClusterClient - Submitting job 4e055a8878dda3f564a7b7c84d48510d (detached: false). Thanks, Yu Watanabe On Sun, Sep 15, 2019 at 3:01 AM Kyle Weaver wrote: > Try adding "--experiments=beam_fn_api" to your pipeline options. (This is > a known issue with Beam 2.15 that will be fixed in 2.16.) > > Kyle Weaver | Software Engineer | github.com/ibzib | kcwea...@google.com > > > On Sat, Sep 14, 2019 at 12:52 AM Yu Watanabe > wrote: > >> Hello. >> >> I am trying to spin up the flink runner but looks like data serialization >> is failing. >> I would like to ask for help to get over with this error. >> >> >> [flink-runner-job-invoker] ERROR >> org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation - Error >> during job invocation >> BeamApp-ywatanabe-0914074210-2fcf987a_3dc1d4dc-4754-470a-9a23-eb8a68903016. >> java.lang.IllegalArgumentException: unable to deserialize BoundedSource >> at >> org.apache.beam.sdk.util.SerializableUtils.deserializeFromByteArray(SerializableUtils.java:74) >> at >> org.apache.beam.runners.core.construction.ReadTranslation.boundedSourceFromProto(ReadTranslation.java:94) >> at >> org.apache.beam.runners.flink.FlinkBatchPortablePipelineTranslator.translateRead(FlinkBatchPortablePipelineTranslator.java:573) >> at >> org.apache.beam.runners.flink.FlinkBatchPortablePipelineTranslator.translate(FlinkBatchPortablePipelineTranslator.java:278) >> at >> org.apache.beam.runners.flink.FlinkBatchPortablePipelineTranslator.translate(FlinkBatchPortablePipelineTranslator.java:120) >> at >> org.apache.beam.runners.flink.FlinkPipelineRunner.runPipelineWithTranslator(FlinkPipelineRunner.java:84) >> at >> org.apache.beam.runners.flink.FlinkPipelineRunner.run(FlinkPipelineRunner.java:63) >> at >> org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation.runPipeline(JobInvocation.java:74) >> at >> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.TrustedListenableFutureTask$TrustedFutureInterruptibleTask.runInterruptibly(TrustedListenableFutureTask.java:125) >> at >> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.InterruptibleTask.run(InterruptibleTask.java:57) >> at >> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.TrustedListenableFutureTask.run(TrustedListenableFutureTask.java:78) >> at >> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)(python) >> ywatanabe@debian-09-00:~$ >> at >> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) >>
Flink Runner logging FAILED_TO_UNCOMPRESS
Hello. I am trying to spin up the flink runner but looks like data serialization is failing. I would like to ask for help to get over with this error. [flink-runner-job-invoker] ERROR org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation - Error during job invocation BeamApp-ywatanabe-0914074210-2fcf987a_3dc1d4dc-4754-470a-9a23-eb8a68903016. java.lang.IllegalArgumentException: unable to deserialize BoundedSource at org.apache.beam.sdk.util.SerializableUtils.deserializeFromByteArray(SerializableUtils.java:74) at org.apache.beam.runners.core.construction.ReadTranslation.boundedSourceFromProto(ReadTranslation.java:94) at org.apache.beam.runners.flink.FlinkBatchPortablePipelineTranslator.translateRead(FlinkBatchPortablePipelineTranslator.java:573) at org.apache.beam.runners.flink.FlinkBatchPortablePipelineTranslator.translate(FlinkBatchPortablePipelineTranslator.java:278) at org.apache.beam.runners.flink.FlinkBatchPortablePipelineTranslator.translate(FlinkBatchPortablePipelineTranslator.java:120) at org.apache.beam.runners.flink.FlinkPipelineRunner.runPipelineWithTranslator(FlinkPipelineRunner.java:84) at org.apache.beam.runners.flink.FlinkPipelineRunner.run(FlinkPipelineRunner.java:63) at org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation.runPipeline(JobInvocation.java:74) at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.TrustedListenableFutureTask$TrustedFutureInterruptibleTask.runInterruptibly(TrustedListenableFutureTask.java:125) at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.InterruptibleTask.run(InterruptibleTask.java:57) at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.TrustedListenableFutureTask.run(TrustedListenableFutureTask.java:78) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)(python) ywatanabe@debian-09-00:~$ at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: java.io.IOException: FAILED_TO_UNCOMPRESS(5) at org.xerial.snappy.SnappyNative.throw_error(SnappyNative.java:98) at org.xerial.snappy.SnappyNative.rawUncompress(Native Method) at org.xerial.snappy.Snappy.rawUncompress(Snappy.java:474) at org.xerial.snappy.Snappy.uncompress(Snappy.java:513) at org.xerial.snappy.SnappyInputStream.readFully(SnappyInputStream.java:147) at org.xerial.snappy.SnappyInputStream.readHeader(SnappyInputStream.java:99) at org.xerial.snappy.SnappyInputStream.(SnappyInputStream.java:59) at org.apache.beam.sdk.util.SerializableUtils.deserializeFromByteArray(SerializableUtils.java:68) ... 13 more My beam version is below. === (python) ywatanabe@debian-09-00:~$ pip3 freeze | grep apache-beam apache-beam==2.15.0 === I have my harness container ready on the registry. === ywatanabe@debian-09-00:~$ docker search ywatanabe-docker-apache.bintray.io/python3 NAMEDESCRIPTION STARS OFFICIAL AUTOMATED beam/python30 === Flink is ready on separate cluster. === (python) ywatanabe@debian-09-00:~$ ss -atunp | grep 8081 tcpLISTEN 0 128 :::8081 :::* === My debian version. === (python) ywatanabe@debian-09-00:~$ cat /etc/debian_version 9.11 === My code snippet is below. === options = PipelineOptions([ "--runner=FlinkRunner", "--flink_version=1.8", "--flink_master_url=localhost:8081" ]) with beam.Pipeline(options=options) as p: (p | beam.Create(["Hello World"])) === Would there be any other settings should I look for ? Thanks, Yu Watanabe -- Yu Watanabe Weekend Freelancer who loves to challenge building data platform yu.w.ten...@gmail.com [image: LinkedIn icon] <https://www.linkedin.com/in/yuwatanabe1> [image: Twitter icon] <https://twitter.com/yuwtennis>
Re: How do you write portable runner pipeline on separate python code ?
Lukasz Thank you for the reply. > * Using a "remote" filesystem such as HDFS/S3/GCS/... > * Mounting an external directory into the container so that any "local" writes appear outside the container > * Using a non-docker environment such as external or process. Understood. Thanks, Yu Watanabe On Fri, Sep 13, 2019 at 2:34 AM Lukasz Cwik wrote: > When you use a local filesystem path and a docker environment, "/tmp" is > written inside the container. You can solve this issue by: > * Using a "remote" filesystem such as HDFS/S3/GCS/... > * Mounting an external directory into the container so that any "local" > writes appear outside the container > * Using a non-docker environment such as external or process. > > On Thu, Sep 12, 2019 at 4:51 AM Yu Watanabe wrote: > >> Hello. >> >> I would like to ask for help with my sample code using portable runner >> using apache flink. >> I was able to work out the wordcount.py using this page. >> >> https://beam.apache.org/roadmap/portability/ >> >> I got below two files under /tmp. >> >> -rw-r--r-- 1 ywatanabe ywatanabe185 Sep 12 19:56 >> py-wordcount-direct-1-of-2 >> -rw-r--r-- 1 ywatanabe ywatanabe190 Sep 12 19:56 >> py-wordcount-direct-0-of-2 >> >> Then I wrote sample code with below steps. >> >> 1.Install apache_beam using pip3 separate from source code directory. >> 2. Wrote sample code as below and named it "test-protable-runner.py". >> Placed it separate directory from source code. >> >> --- >> (python) ywatanabe@debian-09-00:~$ ls -ltr >> total 16 >> drwxr-xr-x 18 ywatanabe ywatanabe 4096 Sep 12 19:06 beam (<- source code >> directory) >> -rw-r--r-- 1 ywatanabe ywatanabe 634 Sep 12 20:25 >> test-portable-runner.py >> >> --- >> 3. Executed the code with "python3 test-protable-ruuner.py" >> >> >> == >> #!/usr/bin/env >> >> import apache_beam as beam >> from apache_beam.options.pipeline_options import PipelineOptions >> from apache_beam.io import WriteToText >> >> >> def printMsg(line): >> >> print("OUTPUT: {0}".format(line)) >> >> return line >> >> options = PipelineOptions(["--runner=PortableRunner", >> "--job_endpoint=localhost:8099", "--shutdown_sources_on_final_watermark"]) >> >> p = beam.Pipeline(options=options) >> >> output = ( p | 'create' >> beam.Create(["a", "b", "c"]) >> | beam.Map(printMsg) >> ) >> >> output | 'write' >> WriteToText('/tmp/sample.txt') >> >> === >> >> Job seemed to went all the way to "FINISHED" state. >> >> --- >> [DataSource (Impulse) (1/1)] INFO >> org.apache.flink.runtime.taskmanager.Task - Registering task at network: >> DataSource (Impulse) (1/1) (9df528f4d493d8c3b62c8be23dc889a8) [DEPLOYING]. >> [DataSource (Impulse) (1/1)] INFO >> org.apache.flink.runtime.taskmanager.Task - DataSource (Impulse) (1/1) >> (9df528f4d493d8c3b62c8be23dc889a8) switched from DEPLOYING to RUNNING. >> [flink-akka.actor.default-dispatcher-3] INFO >> org.apache.flink.runtime.executiongraph.ExecutionGraph - DataSource >> (Impulse) (1/1) (9df528f4d493d8c3b62c8be23dc889a8) switched from DEPLOYING >> to RUNNING. >> [flink-akka.actor.default-dispatcher-3] INFO >> org.apache.flink.runtime.executiongraph.ExecutionGraph - CHAIN MapPartition >> (MapPartition at [2]write/Write/WriteImpl/DoOnce/{FlatMap(> core.py:2415>), Map(decode)}) -> FlatMap (FlatMap at ExtractOutput[0]) >> (1/1) (0b60f1a9e620b061f85e97998aa4b181) switched from CREATED to SCHEDULED. >> [flink-akka.actor.default-dispatcher-3] INFO >> org.apache.flink.runtime.executiongraph.ExecutionGraph - CHAIN MapPartition >> (MapPartition at [2]write/Write/WriteImpl/DoOnce/{FlatMap(> core.py:2415>), Map(decode)}) -> FlatMap (FlatMap at ExtractOutput[0]) >> (1/1) (0b60f1a9e620b061f85e97998aa4b181) switched from SCHEDULED to >> DEPLOYING. >> [flink-akka.actor.default-dispat
Re: How do you write portable runner pipeline on separate python code ?
Kyle Thank you for the advice. > For example, Yu's pipeline errored here because the expected Docker container wasn't built before running. I was able to spin up the harness container and submit job to the job service by preparing the container properly. I needed to do extra steps in the online instruction.. What I have done is things you should already know I guess. Below (*) is what I have done. https://beam.apache.org/documentation/runners/flink/ Executing a Beam pipeline on a Flink Cluster 1. Only required once: Build the SDK harness container (optionally replace py35 with the Python version of your choice): ./gradlew :sdks:python:container:py35:docker *2. Prepare bintray account (https://bintray.com/) *3. Push the image to bintray registry using "docker push" (mentioned here => https://github.com/apache/beam/blob/release-2.15.0/sdks/CONTAINERS.md) *4. Login to bintray account by "docker login" 5.. Start the JobService endpoint: ./gradlew :runners:flink:1.5:job-server:runShadow The JobService is the central instance where you submit your Beam pipeline to. The JobService will create a Flink job for the pipeline and execute the job. To execute the job on a Flink cluster, the Beam JobService needs to be provided with the Flink JobManager address. 6. Submit the Python pipeline to the above endpoint by using the PortableRunner and job_endpoint set to localhost:8099 (this is the default address of the JobService). For example: ==== Thanks, Yu Watanabe On Fri, Sep 13, 2019 at 5:09 AM Kyle Weaver wrote: > +dev I think we should probably point new users of > the portable Flink/Spark runners to use loopback or some other non-docker > environment, as Docker adds some operational complexity that isn't really > needed to run a word count example. For example, Yu's pipeline errored here > because the expected Docker container wasn't built before running. > > Kyle Weaver | Software Engineer | github.com/ibzib | kcwea...@google.com > > > On Thu, Sep 12, 2019 at 11:27 AM Robert Bradshaw > wrote: > >> On this note, making local files easy to read is something we'd >> definitely like to improve, as the current behavior is quite surprising. >> This could be useful not just for running with docker and the portable >> runner locally, but more generally when running on a distributed system >> (e.g. a Flink/Spark cluster or Dataflow). It would be very convenient if we >> could automatically stage local files to be read as artifacts that could be >> consumed by any worker (possibly via external directory mounting in the >> local docker case rather than an actual copy), and conversely copy small >> outputs back to the local machine (with the similar optimization for local >> docker). >> >> At the very least, however, obvious messaging when the local filesystem >> is used from within docker, which is often a (non-obvious and hard to >> debug) mistake should be added. >> >> >> On Thu, Sep 12, 2019 at 10:34 AM Lukasz Cwik wrote: >> >>> When you use a local filesystem path and a docker environment, "/tmp" is >>> written inside the container. You can solve this issue by: >>> * Using a "remote" filesystem such as HDFS/S3/GCS/... >>> * Mounting an external directory into the container so that any "local" >>> writes appear outside the container >>> * Using a non-docker environment such as external or process. >>> >>> On Thu, Sep 12, 2019 at 4:51 AM Yu Watanabe >>> wrote: >>> >>>> Hello. >>>> >>>> I would like to ask for help with my sample code using portable runner >>>> using apache flink. >>>> I was able to work out the wordcount.py using this page. >>>> >>>> https://beam.apache.org/roadmap/portability/ >>>> >>>> I got below two files under /tmp. >>>> >>>> -rw-r--r-- 1 ywatanabe ywatanabe185 Sep 12 19:56 >>>> py-wordcount-direct-1-of-2 >>>> -rw-r--r-- 1 ywatanabe ywatanabe190 Sep 12 19:56 >>>> py-wordcount-direct-0-of-2 >>>> >>>> Then I wrote sample code with below steps. >>>> >>>> 1.Install apache_beam using pip3 separate from source code directory. >>>> 2. Wrote sample code as below and named it "test-protable-runner.py". >>>> Placed it separate directory from source code. >>>> >>>> --- >>&
Re: How to buffer events using spark portable runner ?
Lukasaz Thank you for the reply. I will try apache flink. Thanks, Yu On Sun, Sep 8, 2019 at 11:59 PM Lukasz Cwik wrote: > Try using Apache Flink. > > On Sun, Sep 8, 2019 at 6:23 AM Yu Watanabe wrote: > >> Hello . >> >> I would like to ask question related to timely processing as stated in >> below page. >> >> https://beam.apache.org/blog/2017/08/28/timely-processing.html >> >> Python version: 3.7.4 >> apache beam version: 2.15.0 >> >> I currently use timely processing to first buffer events and send *bulk >> requests *to elasticsearch. The source of data is bounded source and I >> use DirectRunner for runner. >> >> To have more memory resource , I am considering to move to process the >> pipeline on apache spark using portable runner. However, according to >> compatibility matrix, >> *Timers *is not supported on apache spark. >> >> >> https://beam.apache.org/documentation/runners/capability-matrix/#cap-summary-when >> >> Is there anyway in portable runner that you can do similar processing as >> *timely >> processing* ? >> This is my first time using portable runner and I appreciate if I can get >> help with this. >> >> Best Regards, >> Yu Watanabe >> >> -- >> Yu Watanabe >> Weekend Freelancer who loves to challenge building data platform >> yu.w.ten...@gmail.com >> [image: LinkedIn icon] <https://www.linkedin.com/in/yuwatanabe1> [image: >> Twitter icon] <https://twitter.com/yuwtennis> >> > -- Yu Watanabe Weekend Freelancer who loves to challenge building data platform yu.w.ten...@gmail.com [image: LinkedIn icon] <https://www.linkedin.com/in/yuwatanabe1> [image: Twitter icon] <https://twitter.com/yuwtennis>
How do you write portable runner pipeline on separate python code ?
ts of Python 3 are not yet fully supported by Apache Beam. 'Some syntactic constructs of Python 3 are not yet fully supported by ' ERROR:root:java.io.IOException: Received exit code 125 for command 'docker run -d --network=host --env=DOCKER_MAC_CONTAINER=null ywatanabe-docker-apache.bintray.io/beam/python3:latest --id=13-1 --logging_endpoint=localhost:39049 --artifact_endpoint=localhost:33779 --provision_endpoint=localhost:34827 --control_endpoint=localhost:36079'. stderr: Unable to find image ' ywatanabe-docker-apache.bintray.io/beam/python3:latest' locallydocker: Error response from daemon: unknown: Subject ywatanabe was not found.See 'docker run --help'. Traceback (most recent call last): File "test-portable-runner.py", line 27, in result.wait_until_finish() File "/home/ywatanabe/python/lib/python3.5/site-packages/apache_beam/runners/portability/portable_runner.py", line 446, in wait_until_finish self._job_id, self._state, self._last_error_message())) RuntimeError: Pipeline BeamApp-ywatanabe-0912112516-22d84d63_d3b5e778-d771-4118-9ba7-c9dde85938e9 failed in state FAILED: java.io.IOException: Received exit code 125 for command 'docker run -d --network=host --env=DOCKER_MAC_CONTAINER=null ywatanabe-docker-apache.bintray.io/beam/python3:latest --id=13-1 --logging_endpoint=localhost:39049 --artifact_endpoint=localhost:33779 --provision_endpoint=localhost:34827 --control_endpoint=localhost:36079'. stderr: Unable to find image ' ywatanabe-docker-apache.bintray.io/beam/python3:latest' locallydocker: Error response from daemon: unknown: Subject ywatanabe was not found.See 'docker run --help'. --- As a result , I got nothing under /tmp . Code works when using DirectRunner. May I ask , where should I look for in order to get the pipeline to write results to text files under /tmp ? Best Regards, Yu Watanabe -- Yu Watanabe Weekend Freelancer who loves to challenge building data platform yu.w.ten...@gmail.com [image: LinkedIn icon] <https://www.linkedin.com/in/yuwatanabe1> [image: Twitter icon] <https://twitter.com/yuwtennis>
How to buffer events using spark portable runner ?
Hello . I would like to ask question related to timely processing as stated in below page. https://beam.apache.org/blog/2017/08/28/timely-processing.html Python version: 3.7.4 apache beam version: 2.15.0 I currently use timely processing to first buffer events and send *bulk requests *to elasticsearch. The source of data is bounded source and I use DirectRunner for runner. To have more memory resource , I am considering to move to process the pipeline on apache spark using portable runner. However, according to compatibility matrix, *Timers *is not supported on apache spark. https://beam.apache.org/documentation/runners/capability-matrix/#cap-summary-when Is there anyway in portable runner that you can do similar processing as *timely processing* ? This is my first time using portable runner and I appreciate if I can get help with this. Best Regards, Yu Watanabe -- Yu Watanabe Weekend Freelancer who loves to challenge building data platform yu.w.ten...@gmail.com [image: LinkedIn icon] <https://www.linkedin.com/in/yuwatanabe1> [image: Twitter icon] <https://twitter.com/yuwtennis>
Which memory profiler is used for python 3.7.3 ?
Hello. The built in memory profiler used guppy for the profiler but unfortunately guppy is not supported on python3. https://beam.apache.org/releases/pydoc/2.13.0/_modules/apache_beam/utils/profiler.html try: from guppy import hpy # pylint: disable=import-error self._hpy = hpy self._interval_second = interval_second self._timer = None except ImportError: warnings.warn('guppy is not installed; MemoryReporter not available.') self._hpy = None self._enabled = False I am thinking of using memory profiler but I was wondering which memory profiler do users of apache beam use for python 3.7.3 ? https://pypi.org/project/memory-profiler/ Thanks, Yu Watanabe -- Yu Watanabe Weekend Freelancer who loves to challenge building data platform yu.w.ten...@gmail.com [image: LinkedIn icon] <https://www.linkedin.com/in/yuwatanabe1> [image: Twitter icon] <https://twitter.com/yuwtennis>
Re: Any way to profile speed for each transforms ?
Robert. Thank you for the information. I downloaded apache beam using pip3 so beam environment is not a problem in my case. The solution looks fantastic | I love it. Below is output from my sample code. 2019-07-25 10:33:18,722 INFO Test started at 2019-07-25 10:33:18 2019-07-25 10:33:19,780 INFO Filesystem() done. Elapsed time 1.058 ms. 2019-07-25 10:33:31,479 INFO 2019-07-25 10:33:31,479 INFO 2019-07-25 10:33:31,480 INFO 2019-07-25 10:33:31,480 INFO 2019-07-25 10:33:31,481 INFO 2019-07-25 10:33:31,481 INFO 2019-07-25 10:33:31,482 INFO 2019-07-25 10:33:31,482 INFO 2019-07-25 10:33:31,483 INFO 2019-07-25 10:33:31,484 INFO 2019-07-25 10:33:31,484 INFO 2019-07-25 10:33:31,484 INFO 2019-07-25 10:33:31,485 INFO Running ((ref_AppliedPTransform_Create/Read_3)+(ref_AppliedPTransform_FlatMap()_4))+(ref_AppliedPTransform_ParDo(OpenFn)_5) 2019-07-25 10:33:31,485 INFO Running ((ref_AppliedPTransform_Create/Read_3)+(ref_AppliedPTransform_FlatMap()_4))+(ref_AppliedPTransform_ParDo(OpenFn)_5) {'((ref_AppliedPTransform_Create/Read_3)+(ref_AppliedPTransform_FlatMap()_4))+(ref_AppliedPTransform_ParDo(OpenFn)_5)': ptransforms { key: "Create/Read" value { processed_elements { measured { output_element_counts { key: "out" value: 1 } total_time_spent: 0.330032672 } } } } Thanks, Yu On Thu, Jul 25, 2019 at 12:26 AM Robert Bradshaw wrote: > Also, take note that these counters will only be available if Beam has > been compiled with Cython ( e.g. installed from a wheel). Of course if you > care about performance you'd want that anyway. > > On Wed, Jul 24, 2019, 5:15 PM Robert Bradshaw wrote: > >> Beam tracks the amount of time spent in each transform in profile >> counters. There is ongoing work to expose these in a uniform way for all >> runners (e.g. in Dataflow they're displayed on the UI), but for the direct >> runner you can see an example at >> https://github.com/apache/beam/blob/release-2.14.0/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py#L1046 >> . >> For a raw dump you could do something like: >> >> p = beam.Pipeline(...) >> p | beam.Read... >> results = p.run() >> results.wait_until_finish() >> import pprint >> pprint.pprint(results._metrics_by_stage) >> >> >> >> >> On Wed, Jul 24, 2019 at 4:07 PM Yu Watanabe >> wrote: >> >>> Hello . >>> >>> I have a pipeline built on apache beam 2.13.0 using python 3.7.3. >>> My pipeline lasts about 5 hours to ingest 2 sets of approximately 7 >>> Json objects using Direct Runner. >>> >>> I want to diagnose which transforms are taking time and improve code >>> for better performance. I saw below module for profiling but it seems it >>> does not report about speed of each transform. >>> >>> >>> https://beam.apache.org/releases/pydoc/2.13.0/apache_beam.utils.profiler.html >>> >>> Is there any module that you could use to monitor speed of each >>> transform ? If not, I appreciate if I could get some help for how to >>> monitor speed for each transform. >>> >>> Best Regards, >>> Yu Watanabe >>> >>> -- >>> Yu Watanabe >>> Weekend Freelancer who loves to challenge building data platform >>> yu.w.ten...@gmail.com >>> [image: LinkedIn icon] <https://www.linkedin.com/in/yuwatanabe1> [image: >>> Twitter icon] <https://twitter.com/yuwtennis> >>> >> -- Yu Watanabe Weekend Freelancer who loves to challenge building data platform yu.w.ten...@gmail.com [image: LinkedIn icon] <https://www.linkedin.com/in/yuwatanabe1> [image: Twitter icon] <https://twitter.com/yuwtennis>