Re: Running Python Wordcount issues

2019-09-16 Thread Benjamin Tan
I can confirm. I used the master branch to get this to work. I tried the 
"release-2.16.0" branch but that didn't work either.

On 2019/09/16 23:30:54, Tom Barber  wrote: 
> Hello folks,
> 
> Trying to get started running the python word count example against the
> portable runner using the beam 2.15 download.
> 
> In on terminal I have:
> 
> gradle :runners:spark:job-server:runShadow
> 
> And another:
> 
> gradle portableWordCount -PjobEndpoint=localhost:8099
> -PenvironmentType=LOOPBACK
> 
> 
> But when it starts it says:
> 
> Traceback (most recent call last):
>   File "/usr/lib/python2.7/runpy.py", line 174, in _run_module_as_main
> "__main__", fname, loader, pkg_name)
>   File "/usr/lib/python2.7/runpy.py", line 72, in _run_code
> exec code in run_globals
>   File
> "/home/ubuntu/beam-release-2.15.0/sdks/python/apache_beam/examples/wordcount.py",
> line 135, in 
> run()
>   File
> "/home/ubuntu/beam-release-2.15.0/sdks/python/apache_beam/examples/wordcount.py",
> line 115, in run
> result.wait_until_finish()
>   File
> "/home/ubuntu/beam-release-2.15.0/sdks/python/apache_beam/runners/portability/portable_runner.py",
> line 446, in wait_until_finish
> self._job_id, self._state, self._last_error_message()))
> RuntimeError: Pipeline
> BeamApp-ubuntu-0916232552-dd0af6d7_90675fc0-c39c-4ca7-93fc-c7c7e980f546
> failed in state FAILED: java.lang.ClassCastException:
> org.apache.beam.sdk.coders.LengthPrefixCoder cannot be cast to
> org.apache.beam.sdk.coders.KvCoder
> 
> And I have no clue where to start looking to fix that as I’d assume the
> demos should just run?
> 
> Tom
> 
> -- 
> 
> 
> Spicule Limited is registered in England & Wales. Company Number: 
> 09954122. Registered office: First Floor, Telecom House, 125-135 Preston 
> Road, Brighton, England, BN1 6AF. VAT No. 251478891.
> 
> 
> 
> 
> All engagements 
> are subject to Spicule Terms and Conditions of Business. This email and its 
> contents are intended solely for the individual to whom it is addressed and 
> may contain information that is confidential, privileged or otherwise 
> protected from disclosure, distributing or copying. Any views or opinions 
> presented in this email are solely those of the author and do not 
> necessarily represent those of Spicule Limited. The company accepts no 
> liability for any damage caused by any virus transmitted by this email. If 
> you have received this message in error, please notify us immediately by 
> reply email before deleting it from your system. Service of legal notice 
> cannot be effected on Spicule Limited by email.
> 


Re: How to use google container registry for FlinkRunner ?

2019-09-16 Thread Benjamin Tan
Something like this:

options = PipelineOptions(["--runner=PortableRunner",
   "--environment_config=apachebeam/python3.7_sdk ", # 
<---
   "--job_endpoint=dnnserver2:8099"])

On 2019/09/17 02:14:00, Kyle Weaver  wrote: 
> I think environment_config is the option you are looking for.
> 
> On Mon, Sep 16, 2019 at 7:06 PM Yu Watanabe  wrote:
> 
> > Hello.
> >
> > For testing I would like to use image uploaded to google container
> > registry.
> > How can I use images pulled from other than bintray.io ?
> >
> >
> > 
> > admin@ip-172-31-9-89:~$ gcloud container images list --repository
> > asia.gcr.io/[PROJECTNAME] 
> > NAME
> > asia.gcr.io/[PROJECTNAME]/beam 
> >
> > 
> >
> > Looks like FlinkRunner (2.15.0)  uses bintray repository as a default
> > behavior. As a result I am not able to use
> >
> > Caused by: java.lang.Exception: The user defined 'open()' method caused an
> > exception: java.io.IOException: Received exit code 125 for command 'docker
> > run -d --network=host --env=DOCKER_MAC_CONTAINER=null --rm
> > ywatanabe-docker-apache.bintray.io/beam/python3:latest --id=1
> > --logging_endpoint=localhost:33787 --artifact_endpoint=localhost:41681
> > --provision_endpoint=localhost:45089 --control_endpoint=localhost:37045'.
> > stderr: Unable to find image '
> > ywatanabe-docker-apache.bintray.io/beam/python3:latest' locallydocker:
> > Error response from daemon: manifest for
> > ywatanabe-docker-apache.bintray.io/beam/python3:latest not found:
> > manifest unknown: The named manifest is not known to the registry.See
> > 'docker run --
> >
> > According to online doc, there is not parameter which controls which image
> > to use .
> >
> > https://beam.apache.org/documentation/runners/flink/
> >
> > Pipeline options I am using is below.
> >
> >
> > 
> > options = PipelineOptions([
> >   "--runner=FlinkRunner",
> >   "--flink_version=1.8",
> >   "--flink_master_url=localhost:8081",
> >   "--worker_harness_container_image=
> > asia.gcr.io/PROJECTNAME/beam/python3",
> >   "--experiments=beam_fn_api"
> >   ])
> >
> > 
> >
> > Perhaps is there any  environment variable to specify which image to use ?
> >
> > Best Regards,
> > Yu Watanabe
> >
> > --
> > Yu Watanabe
> > Weekend Freelancer who loves to challenge building data platform
> > yu.w.ten...@gmail.com
> > [image: LinkedIn icon]   [image:
> > Twitter icon] 
> >
> -- 
> Kyle Weaver | Software Engineer | github.com/ibzib | kcwea...@google.com
> 


How to use the loopback?

2019-09-16 Thread Benjamin Tan
I'm trying to use the loopback via the environment_type option:

options = PipelineOptions(["--runner=PortableRunner",
   
"--environment_config=-apachebeam/python3.7_sdk ",
   "--environment_type=LOOPBACK",
   "--job_endpoint=dnnserver2:8099"])

Previouly, I've done:

./gradlew -p sdks/python/container buildAll

And ran the Spark job server:

./gradlew :runners:spark:job-server:runShadow 
-PsparkMasterUrl=spark://dnnserver2:7077

However, I get a pretty cryptic error message:

org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.UncheckedExecutionException:
 org.apache.beam.vendor.grpc.v1p21p0.io.grpc.StatusRuntimeException: 
UNIMPLEMENTED: Method not found!

Any ideas?




Re: How to use the loopback?

2019-09-16 Thread Benjamin Tan
t 
org.apache.beam.vendor.grpc.v1p21p0.io.grpc.stub.ClientCalls.getUnchecked(ClientCalls.java:216)
at 
org.apache.beam.vendor.grpc.v1p21p0.io.grpc.stub.ClientCalls.blockingUnaryCall(ClientCalls.java:141)
at 
org.apache.beam.model.fnexecution.v1.BeamFnExternalWorkerPoolGrpc$BeamFnExternalWorkerPoolBlockingStub.startWorker(BeamFnExternalWorkerPoolGrpc.java:226)
at 
org.apache.beam.runners.fnexecution.environment.ExternalEnvironmentFactory.createEnvironment(ExternalEnvironmentFactory.java:113)
at 
org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$1.load(DefaultJobBundleFactory.java:179)
at 
org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$1.load(DefaultJobBundleFactory.java:163)
at 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3528)
at 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2277)
at 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2154)
at 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2044)
... 54 more

Driver stacktrace:
at 
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1889)
at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1877)
at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1876)
at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at 
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1876)
at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)
at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)
at scala.Option.foreach(Option.scala:257)
at 
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:926)
at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2110)
at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2059)
at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2048)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
at 
org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:737)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2082)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2101)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2126)
at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:945)
at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
at org.apache.spark.rdd.RDD.collect(RDD.scala:944)
at 
org.apache.spark.api.java.JavaRDDLike$class.collect(JavaRDDLike.scala:361)
at 
org.apache.spark.api.java.AbstractJavaRDDLike.collect(JavaRDDLike.scala:45)
at 
org.apache.beam.runners.spark.translation.BoundedDataset.getBytes(BoundedDataset.java:76)
at 
org.apache.beam.runners.spark.translation.SparkBatchPortablePipelineTranslator.broadcastSideInput(SparkBatchPortablePipelineTranslator.java:335)
at 
org.apache.beam.runners.spark.translation.SparkBatchPortablePipelineTranslator.translateExecutableStage(SparkBatchPortablePipelineTranslator.java:223)
at 
org.apache.beam.runners.spark.translation.SparkBatchPortablePipelineTranslator.translate(SparkBatchPortablePipelineTranslator.java:137)
at 
org.apache.beam.runners.spark.SparkPipelineRunner.lambda$run$1(SparkPipelineRunner.java:97)
at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
... 3 more
19/09/17 12:57:15 INFO BeamFileSystemArtifactRetrievalService: Manifest at 
/tmp/beam-artifact-staging/job_ce990c45-c56e-401e-aad6-fd72480e1050/MANIFEST 
has 0 artifact locations
19/09/17 12:57:15 INFO BeamFileSystemArtifactStagingService: Removed dir 
/tmp/beam-artifact-staging/job_ce990c45-c56e-401e-aad6-fd72480e1050/



On 2019/09/17 03:50:06, Kyle Weaver  wrote: 
> Could you share more of the stack trace?
> 
> Kyle Weaver | Software Engineer | github.com/ibzib | kcwea...@google.com
> 
> 
> On Mon, Sep 16, 201

Re: How to use the loopback?

2019-09-16 Thread Benjamin Tan
unners/worker/worker_pool_main.py",
 line 126, in StopWorker
worker_process = self._worker_processes.pop(stop_worker_request.worker_id)
KeyError: u'24-1'
ERROR:grpc._server:Exception calling application: u'27-1'
Traceback (most recent call last):
  File 
"/home/benjamintan/miniconda3/envs/apbeam-playground/lib/python2.7/site-packages/grpc/_server.py",
 line 434, in _call_behavior
response_or_iterator = behavior(argument, context)
  File 
"/home/benjamintan/miniconda3/envs/apbeam-playground/lib/python2.7/site-packages/apache_beam/runners/worker/worker_pool_main.py",
 line 126, in StopWorker
worker_process = self._worker_processes.pop(stop_worker_request.worker_id)
KeyError: u'27-1'
ERROR:grpc._server:Exception calling application: u'26-1'
Traceback (most recent call last):
  File 
"/home/benjamintan/miniconda3/envs/apbeam-playground/lib/python2.7/site-packages/grpc/_server.py",
 line 434, in _call_behavior
response_or_iterator = behavior(argument, context)
  File 
"/home/benjamintan/miniconda3/envs/apbeam-playground/lib/python2.7/site-packages/apache_beam/runners/worker/worker_pool_main.py",
 line 126, in StopWorker
worker_process = self._worker_processes.pop(stop_worker_request.worker_id)
KeyError: u'26-1'


On 2019/09/17 04:57:55, Benjamin Tan  wrote: 
> Here you go! 
> 
> builder@dnnserver2:~/beam (release-2.16.0) $ ./gradlew 
> :runners:spark:job-server:runShadow -PsparkMasterUrl=spark://dnnserver2:7077
> Configuration on demand is an incubating feature.
> 
> > Task :runners:spark:job-server:runShadow
> Listening for transport dt_socket at address: 5005
> log4j:WARN No appenders could be found for logger 
> (org.apache.beam.vendor.grpc.v1p21p0.io.netty.util.internal.logging.InternalLoggerFactory).
> log4j:WARN Please initialize the log4j system properly.
> log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more 
> info.
> Using Spark's default log4j profile: 
> org/apache/spark/log4j-defaults.properties
> 19/09/17 12:57:06 INFO SparkContext: Running Spark version 2.4.4
> 19/09/17 12:57:06 WARN NativeCodeLoader: Unable to load native-hadoop library 
> for your platform... using builtin-java classes where applicable
> 19/09/17 12:57:06 INFO SparkContext: Submitted application: 
> BeamApp-builder-0917045705-aa3391a3_96f998c3-3c0d-42f1-885c-f1e2538310bc
> 19/09/17 12:57:06 INFO SecurityManager: Changing view acls to: builder
> 19/09/17 12:57:06 INFO SecurityManager: Changing modify acls to: builder
> 19/09/17 12:57:06 INFO SecurityManager: Changing view acls groups to:
> 19/09/17 12:57:06 INFO SecurityManager: Changing modify acls groups to:
> 19/09/17 12:57:06 INFO SecurityManager: SecurityManager: authentication 
> disabled; ui acls disabled; users  with view permissions: Set(builder); 
> groups with view permissions: Set(); users  with modify permissions: 
> Set(builder); groups with modify permissions: Set()
> 19/09/17 12:57:07 INFO Utils: Successfully started service 'sparkDriver' on 
> port 36069.
> 19/09/17 12:57:07 INFO SparkEnv: Registering MapOutputTracker
> 19/09/17 12:57:07 INFO SparkEnv: Registering BlockManagerMaster
> 19/09/17 12:57:07 INFO BlockManagerMasterEndpoint: Using 
> org.apache.spark.storage.DefaultTopologyMapper for getting topology 
> information
> 19/09/17 12:57:07 INFO BlockManagerMasterEndpoint: BlockManagerMasterEndpoint 
> up
> 19/09/17 12:57:07 INFO DiskBlockManager: Created local directory at 
> /tmp/blockmgr-92f6079e-4a85-4b09-b48b-5d58ddf304a6
> 19/09/17 12:57:07 INFO MemoryStore: MemoryStore started with capacity 1949.1 
> MB
> 19/09/17 12:57:07 INFO SparkEnv: Registering OutputCommitCoordinator
> 19/09/17 12:57:07 INFO Utils: Successfully started service 'SparkUI' on port 
> 4040.
> 19/09/17 12:57:07 INFO SparkUI: Bound SparkUI to 0.0.0.0, and started at 
> http://dnnserver2:4040
> 19/09/17 12:57:07 INFO SparkContext: Added JAR 
> /home/builder/beam/runners/spark/job-server/build/install/job-server-shadow/lib/beam-runners-spark-job-server-2.16.0-SNAPSHOT.jar
>  at 
> spark://dnnserver2:36069/jars/beam-runners-spark-job-server-2.16.0-SNAPSHOT.jar
>  with timestamp 1568696227623
> 19/09/17 12:57:07 INFO StandaloneAppClient$ClientEndpoint: Connecting to 
> master spark://dnnserver2:7077...
> 19/09/17 12:57:07 INFO TransportClientFactory: Successfully created 
> connection to dnnserver2/10.64.1.208:7077 after 40 ms (0 ms spent in 
> bootstraps)
> 19/09/17 12:57:07 INFO StandaloneSchedulerBackend: Connected to Spark cluster 
> with app ID app-20190917125707-0066
> 19/09/17 12:57:07 INFO StandaloneAppClient$ClientEndpoint: Executor added: 
> app-20190917125707-0066/0 on worker-20190916143324-10.64.1.208-41823 
> (10.64.1.208:41823) with 12 

Re: Python Portable Runner Issues

2019-09-17 Thread Benjamin Tan

I encountered the exact same thing today. High five! Here’s how I managed to 
make some progress:

1. Used the master branch
2. Built and installed the Python SDK

cd into the sdk library and python ./setup install 

I got some other errors but they didn’t seem to be show stoppers. 

> On 17 Sep 2019, at 8:26 PM, Tom Barber  wrote:
> 
> Hello folks,
> 
> Day 3 of trying to get the basics going with Python & Spark 2.2.3.
> 
> I’ve downgraded the spark version to 2.2.3 in the cradle build so that I can 
> run jobs against it.
> 
> I’ve then written this:
> 
> options = PipelineOptions(["--runner=PortableRunner", 
> "--job_endpoint=localhost:8099", "--environment_type=LOOPBACK"])
> 
> with beam.Pipeline(options=options) as p:
> print("hello")
> lines =  p | 'Create' >> beam.Create(['hello', 'world'])
> 
> Which in turn explodes with:
> 
> Caused by: 
> org.apache.beam.vendor.grpc.v1p21p0.io.grpc.StatusRuntimeException: 
> UNIMPLEMENTED: Method not found!
>   at 
> org.apache.beam.vendor.grpc.v1p21p0.io.grpc.stub.ClientCalls.toStatusRuntimeException(ClientCalls.java:235)
>   at 
> org.apache.beam.vendor.grpc.v1p21p0.io.grpc.stub.ClientCalls.getUnchecked(ClientCalls.java:216)
>   at 
> org.apache.beam.vendor.grpc.v1p21p0.io.grpc.stub.ClientCalls.blockingUnaryCall(ClientCalls.java:141)
>   at 
> org.apache.beam.model.fnexecution.v1.BeamFnExternalWorkerPoolGrpc$BeamFnExternalWorkerPoolBlockingStub.startWorker(BeamFnExternalWorkerPoolGrpc.java:226)
> 
> When run against python 2 and 
> 
> AttributeError: module 'apache_beam.coders.coders' has no attribute 
> ‘VarIntCoder'
> 
> When run with python 3.5.
> 
> Should I just give up trying the portable runner at the moment or am I doing 
> something dumb/incompatible? I’m stuck on Spark 2.2.3 so upgrading isn’t 
> currently an option.
> 
> Tom
> 
> Spicule Limited is registered in England & Wales. Company Number: 09954122. 
> Registered office: First Floor, Telecom House, 125-135 Preston Road, 
> Brighton, England, BN1 6AF. VAT No. 251478891.
> 
> 
> 
> All engagements are subject to Spicule Terms and Conditions of Business. This 
> email and its contents are intended solely for the individual to whom it is 
> addressed and may contain information that is confidential, privileged or 
> otherwise protected from disclosure, distributing or copying. Any views or 
> opinions presented in this email are solely those of the author and do not 
> necessarily represent those of Spicule Limited. The company accepts no 
> liability for any damage caused by any virus transmitted by this email. If 
> you have received this message in error, please notify us immediately by 
> reply email before deleting it from your system. Service of legal notice 
> cannot be effected on Spicule Limited by email.


Re: Python Portable Runner Issues

2019-09-17 Thread Benjamin Tan
If it helps, I’m using Spark 2.4.4. The Apache Beam Python library on master is 
2.17.0-dev. 


> On 17 Sep 2019, at 9:39 PM, Tom Barber  wrote:
> 
> Cool thanks Benjamin, I’ll give it a shot.
> 
> Tom
> 
> 
>> On 17 September 2019 at 13:56:14, Benjamin Tan (benjamintanwei...@gmail.com) 
>> wrote:
>> 
>> 
>> I encountered the exact same thing today. High five! Here’s how I managed to 
>> make some progress:
>> 
>> 1. Used the master branch
>> 2. Built and installed the Python SDK
>> 
>> cd into the sdk library and python ./setup install 
>> 
>> I got some other errors but they didn’t seem to be show stoppers. 
>> 
>> On 17 Sep 2019, at 8:26 PM, Tom Barber  wrote:
>> 
>>> Hello folks,
>>> 
>>> Day 3 of trying to get the basics going with Python & Spark 2.2.3.
>>> 
>>> I’ve downgraded the spark version to 2.2.3 in the cradle build so that I 
>>> can run jobs against it.
>>> 
>>> I’ve then written this:
>>> 
>>> options = PipelineOptions(["--runner=PortableRunner", 
>>> "--job_endpoint=localhost:8099", "--environment_type=LOOPBACK"])
>>> 
>>> with beam.Pipeline(options=options) as p:
>>> print("hello")
>>> lines =  p | 'Create' >> beam.Create(['hello', 'world'])
>>> 
>>> Which in turn explodes with:
>>> 
>>> Caused by: 
>>> org.apache.beam.vendor.grpc.v1p21p0.io.grpc.StatusRuntimeException: 
>>> UNIMPLEMENTED: Method not found!
>>> at 
>>> org.apache.beam.vendor.grpc.v1p21p0.io.grpc.stub.ClientCalls.toStatusRuntimeException(ClientCalls.java:235)
>>> at 
>>> org.apache.beam.vendor.grpc.v1p21p0.io.grpc.stub.ClientCalls.getUnchecked(ClientCalls.java:216)
>>> at 
>>> org.apache.beam.vendor.grpc.v1p21p0.io.grpc.stub.ClientCalls.blockingUnaryCall(ClientCalls.java:141)
>>> at 
>>> org.apache.beam.model.fnexecution.v1.BeamFnExternalWorkerPoolGrpc$BeamFnExternalWorkerPoolBlockingStub.startWorker(BeamFnExternalWorkerPoolGrpc.java:226)
>>> 
>>> When run against python 2 and 
>>> 
>>> AttributeError: module 'apache_beam.coders.coders' has no attribute 
>>> ‘VarIntCoder'
>>> 
>>> When run with python 3.5.
>>> 
>>> Should I just give up trying the portable runner at the moment or am I 
>>> doing something dumb/incompatible? I’m stuck on Spark 2.2.3 so upgrading 
>>> isn’t currently an option.
>>> 
>>> Tom
>>> 
>>> Spicule Limited is registered in England & Wales. Company Number: 09954122. 
>>> Registered office: First Floor, Telecom House, 125-135 Preston Road, 
>>> Brighton, England, BN1 6AF. VAT No. 251478891.
>>> 
>>> 
>>> 
>>> All engagements are subject to Spicule Terms and Conditions of Business. 
>>> This email and its contents are intended solely for the individual to whom 
>>> it is addressed and may contain information that is confidential, 
>>> privileged or otherwise protected from disclosure, distributing or copying. 
>>> Any views or opinions presented in this email are solely those of the 
>>> author and do not necessarily represent those of Spicule Limited. The 
>>> company accepts no liability for any damage caused by any virus transmitted 
>>> by this email. If you have received this message in error, please notify us 
>>> immediately by reply email before deleting it from your system. Service of 
>>> legal notice cannot be effected on Spicule Limited by email.
>>> 
> 
> Spicule Limited is registered in England & Wales. Company Number: 09954122. 
> Registered office: First Floor, Telecom House, 125-135 Preston Road, 
> Brighton, England, BN1 6AF. VAT No. 251478891.
> 
> 
> 
> All engagements are subject to Spicule Terms and Conditions of Business. This 
> email and its contents are intended solely for the individual to whom it is 
> addressed and may contain information that is confidential, privileged or 
> otherwise protected from disclosure, distributing or copying. Any views or 
> opinions presented in this email are solely those of the author and do not 
> necessarily represent those of Spicule Limited. The company accepts no 
> liability for any damage caused by any virus transmitted by this email. If 
> you have received this message in error, please notify us immediately by 
> reply email before deleting it from your system. Service of legal notice 
> cannot be effected on Spicule Limited by email.


Re: Python Portable Runner Issues

2019-09-17 Thread Benjamin Tan
Tell me if you see any output. Anyway, here's the link to the same issue you're 
facing:

https://lists.apache.org/thread.html/4e8e1455916debe096de32551f9ab05853524cf282bc312cd4620d68@%3Cuser.beam.apache.org%3E

The amount of issues I've encountered as a newbie is indeed troubling. 

On 2019/09/17 14:43:11, Tom Barber  wrote: 
> 🤣 okay I’ll look again, I assumed it just crashed in a ball of flames!
> 
> 
> On 17 September 2019 at 15:39:33, Benjamin Tan (benjamintanwei...@gmail.com)
> wrote:
> 
> I got this too!  Did you manage to get any output? (I did) I reported this
> in another thread.
> 
> This looks like a key error when StopWorker is called. As far as I know, it
> seems like the work has been processed.
> 
> On Tue, Sep 17, 2019 at 10:31 PM Tom Barber  wrote:
> 
> > Well my errors are different but still terminal:
> >
> > ERROR:grpc._server:Exception calling application: u'1-1'
> > Traceback (most recent call last):
> >   File "/usr/local/lib/python2.7/dist-packages/grpc/_server.py", line 434,
> > in _call_behavior
> > response_or_iterator = behavior(argument, context)
> >   File
> > "/usr/local/lib/python2.7/dist-packages/apache_beam-2.17.0.dev0-py2.7.egg/apache_beam/runners/worker/worker_pool_main.py",
> > line 126, in StopWorker
> > worker_process =
> > self._worker_processes.pop(stop_worker_request.worker_id)
> > KeyError: u'1-1'
> > ERROR:grpc._server:Exception calling application: u'2-1'
> > Traceback (most recent call last):
> >   File "/usr/local/lib/python2.7/dist-packages/grpc/_server.py", line 434,
> > in _call_behavior
> > response_or_iterator = behavior(argument, context)
> >   File
> > "/usr/local/lib/python2.7/dist-packages/apache_beam-2.17.0.dev0-py2.7.egg/apache_beam/runners/worker/worker_pool_main.py",
> > line 126, in StopWorker
> > worker_process =
> > self._worker_processes.pop(stop_worker_request.worker_id)
> > KeyError: u'2-1'
> >
> >
> > On 17 September 2019 at 14:46:12, Benjamin Tan (
> > benjamintanwei...@gmail.com) wrote:
> >
> > If it helps, I’m using Spark 2.4.4. The Apache Beam Python library on
> > master is 2.17.0-dev.
> >
> >
> > On 17 Sep 2019, at 9:39 PM, Tom Barber  wrote:
> >
> > Cool thanks Benjamin, I’ll give it a shot.
> >
> > Tom
> >
> >
> > On 17 September 2019 at 13:56:14, Benjamin Tan (
> > benjamintanwei...@gmail.com) wrote:
> >
> >
> > I encountered the exact same thing today. High five! Here’s how I managed
> > to make some progress:
> >
> > 1. Used the master branch
> > 2. Built and installed the Python SDK
> >
> > cd into the sdk library and python ./setup install
> >
> > I got some other errors but they didn’t seem to be show stoppers.
> >
> > On 17 Sep 2019, at 8:26 PM, Tom Barber  wrote:
> >
> > Hello folks,
> >
> > Day 3 of trying to get the basics going with Python & Spark 2.2.3.
> >
> > I’ve downgraded the spark version to 2.2.3 in the cradle build so that I
> > can run jobs against it.
> >
> > I’ve then written this:
> >
> > options = PipelineOptions(["--runner=PortableRunner",
> > "--job_endpoint=localhost:8099", "--environment_type=LOOPBACK"])
> >
> > with beam.Pipeline(options=options) as p:
> > print("hello")
> > lines =  p | 'Create' >> beam.Create(['hello', 'world'])
> >
> > Which in turn explodes with:
> >
> > Caused by:
> > org.apache.beam.vendor.grpc.v1p21p0.io.grpc.StatusRuntimeException:
> > UNIMPLEMENTED: Method not found!
> > at
> > org.apache.beam.vendor.grpc.v1p21p0.io.grpc.stub.ClientCalls.toStatusRuntimeException(ClientCalls.java:235)
> > at
> > org.apache.beam.vendor.grpc.v1p21p0.io.grpc.stub.ClientCalls.getUnchecked(ClientCalls.java:216)
> > at
> > org.apache.beam.vendor.grpc.v1p21p0.io.grpc.stub.ClientCalls.blockingUnaryCall(ClientCalls.java:141)
> > at
> > org.apache.beam.model.fnexecution.v1.BeamFnExternalWorkerPoolGrpc$BeamFnExternalWorkerPoolBlockingStub.startWorker(BeamFnExternalWorkerPoolGrpc.java:226)
> >
> > When run against python 2 and
> >
> > AttributeError: module 'apache_beam.coders.coders' has no
> > attribute ‘VarIntCoder'
> >
> > When run with python 3.5.
> >
> > Should I just give up trying the portable runner at the moment or am I
> > doing something dumb/incompatible? I’m stuck on Spa

Re: Python Portable Runner Issues

2019-09-17 Thread Benjamin Tan
:D. Still, I'm curious as to the error we both are getting. Maybe someone
can shed some light on it.

On Tue, Sep 17, 2019 at 10:54 PM Tom Barber  wrote:

> I do see hello written to 1 file and world to another, I guess it works!
> Thanks for the pointers Benjamin I was about to give up.
>
> Tom
>
>
> On 17 September 2019 at 15:51:13, Benjamin Tan (
> benjamintanwei...@gmail.com) wrote:
>
> Tell me if you see any output. Anyway, here's the link to the same issue
> you're facing:
>
>
> https://lists.apache.org/thread.html/4e8e1455916debe096de32551f9ab05853524cf282bc312cd4620d68@%3Cuser.beam.apache.org%3E
>
> The amount of issues I've encountered as a newbie is indeed troubling.
>
> On 2019/09/17 14:43:11, Tom Barber  wrote:
> > 🤣 okay I’ll look again, I assumed it just crashed in a ball of flames!
> >
> >
> > On 17 September 2019 at 15:39:33, Benjamin Tan (
> benjamintanwei...@gmail.com)
> > wrote:
> >
> > I got this too! Did you manage to get any output? (I did) I reported
> this
> > in another thread.
> >
> > This looks like a key error when StopWorker is called. As far as I know,
> it
> > seems like the work has been processed.
> >
> > On Tue, Sep 17, 2019 at 10:31 PM Tom Barber  wrote:
> >
> > > Well my errors are different but still terminal:
> > >
> > > ERROR:grpc._server:Exception calling application: u'1-1'
> > > Traceback (most recent call last):
> > > File "/usr/local/lib/python2.7/dist-packages/grpc/_server.py", line
> 434,
> > > in _call_behavior
> > > response_or_iterator = behavior(argument, context)
> > > File
> > >
> "/usr/local/lib/python2.7/dist-packages/apache_beam-2.17.0.dev0-py2.7.egg/apache_beam/runners/worker/worker_pool_main.py",
>
> > > line 126, in StopWorker
> > > worker_process =
> > > self._worker_processes.pop(stop_worker_request.worker_id)
> > > KeyError: u'1-1'
> > > ERROR:grpc._server:Exception calling application: u'2-1'
> > > Traceback (most recent call last):
> > > File "/usr/local/lib/python2.7/dist-packages/grpc/_server.py", line
> 434,
> > > in _call_behavior
> > > response_or_iterator = behavior(argument, context)
> > > File
> > >
> "/usr/local/lib/python2.7/dist-packages/apache_beam-2.17.0.dev0-py2.7.egg/apache_beam/runners/worker/worker_pool_main.py",
>
> > > line 126, in StopWorker
> > > worker_process =
> > > self._worker_processes.pop(stop_worker_request.worker_id)
> > > KeyError: u'2-1'
> > >
> > >
> > > On 17 September 2019 at 14:46:12, Benjamin Tan (
> > > benjamintanwei...@gmail.com) wrote:
> > >
> > > If it helps, I’m using Spark 2.4.4. The Apache Beam Python library on
> > > master is 2.17.0-dev.
> > >
> > >
> > > On 17 Sep 2019, at 9:39 PM, Tom Barber  wrote:
> > >
> > > Cool thanks Benjamin, I’ll give it a shot.
> > >
> > > Tom
> > >
> > >
> > > On 17 September 2019 at 13:56:14, Benjamin Tan (
> > > benjamintanwei...@gmail.com) wrote:
> > >
> > >
> > > I encountered the exact same thing today. High five! Here’s how I
> managed
> > > to make some progress:
> > >
> > > 1. Used the master branch
> > > 2. Built and installed the Python SDK
> > >
> > > cd into the sdk library and python ./setup install
> > >
> > > I got some other errors but they didn’t seem to be show stoppers.
> > >
> > > On 17 Sep 2019, at 8:26 PM, Tom Barber  wrote:
> > >
> > > Hello folks,
> > >
> > > Day 3 of trying to get the basics going with Python & Spark 2.2.3.
> > >
> > > I’ve downgraded the spark version to 2.2.3 in the cradle build so that
> I
> > > can run jobs against it.
> > >
> > > I’ve then written this:
> > >
> > > options = PipelineOptions(["--runner=PortableRunner",
> > > "--job_endpoint=localhost:8099", "--environment_type=LOOPBACK"])
> > >
> > > with beam.Pipeline(options=options) as p:
> > > print("hello")
> > > lines = p | 'Create' >> beam.Create(['hello', 'world'])
> > >
> > > Which in turn explodes with:
> > >
> > > Caused by:
> > > org.apache.beam.vendor.grpc.v1p21p0.io.grpc.StatusRuntimeException:
> > > UNIMPLEMENTED: Method not found!
&g

Re: Python Portable Runner Issues

2019-09-17 Thread Benjamin Tan
Thanks for all the replies Kyle! You've been super helpful :D.

Would you say that the Flink runner more stable than the Spark one? Or which 
combo is the most stable for now?

On 2019/09/17 19:43:54, Tom Barber  wrote: 
> Thanks Kyle,
> 
> From my pov Alpha is fine, I’m just trying to test out some of the
> capabilities currently, but trying to dig around the website doesn’t
> explain a great deal. Luckily Benjamin seems a step ahead of me… I hope it
> stays that way!  ;)
> 
> 
> On 17 September 2019 at 19:33:40, Kyle Weaver (kcwea...@google.com) wrote:
> 
> > The amount of issues I've encountered as a newbie is indeed troubling.
> Spark portability is very much "alpha" quality software, a point we should
> maybe emphasize on the website more. Anyway, I appreciate your patience,
> and I'll do my best to address all these issues.
> 
> > org.apache.beam.vendor.grpc.v1p21p0.io.grpc.StatusRuntimeException:
> UNIMPLEMENTED: Method not found!
> > AttributeError: module 'apache_beam.coders.coders' has no
> attribute ‘VarIntCoder'
> This class of errors occurs when the SDK version does not match up with the
> runner version -- unfortunately, we cannot guarantee compatibility between
> the two, so manual syncing is required for now. We are looking for ways to
> improve this.
> 
> > ERROR:grpc._server:Exception calling application: u'2-1'
> It looks like these errors are entirely spurious. Have a fix for it here:
> https://github.com/apache/beam/pull/9600
> 
> Note that there may still be other spurious errors like:
> 
> 19/09/17 11:25:31 ERROR ManagedChannelOrphanWrapper: *~*~*~ Channel
> ManagedChannelImpl{logId=84, target=localhost:36129} was not shutdown
> properly!!! ~*~*~*
> 
> and
> 
> 19/09/17 11:25:32 ERROR SerializingExecutor: Exception while executing
> runnable
> org.apache.beam.vendor.grpc.v1p21p0.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1HalfClosed@47db89b1
> java.lang.IllegalStateException: call already closed
> 
> I will try to get rid of them also, but for now you can just ignore them.
> They are annoying but harmless.
> 
> Kyle Weaver | Software Engineer | github.com/ibzib | kcwea...@google.com
> 
> 
> On Tue, Sep 17, 2019 at 8:05 AM Benjamin Tan 
> wrote:
> 
> > :D. Still, I'm curious as to the error we both are getting. Maybe someone
> > can shed some light on it.
> >
> > On Tue, Sep 17, 2019 at 10:54 PM Tom Barber  wrote:
> >
> >> I do see hello written to 1 file and world to another, I guess it works!
> >> Thanks for the pointers Benjamin I was about to give up.
> >>
> >> Tom
> >>
> >>
> >> On 17 September 2019 at 15:51:13, Benjamin Tan (
> >> benjamintanwei...@gmail.com) wrote:
> >>
> >> Tell me if you see any output. Anyway, here's the link to the same issue
> >> you're facing:
> >>
> >>
> >> https://lists.apache.org/thread.html/4e8e1455916debe096de32551f9ab05853524cf282bc312cd4620d68@%3Cuser.beam.apache.org%3E
> >>
> >> The amount of issues I've encountered as a newbie is indeed troubling.
> >>
> >> On 2019/09/17 14:43:11, Tom Barber  wrote:
> >> > 🤣 okay I’ll look again, I assumed it just crashed in a ball of flames!
> >> >
> >> >
> >> > On 17 September 2019 at 15:39:33, Benjamin Tan (
> >> benjamintanwei...@gmail.com)
> >> > wrote:
> >> >
> >> > I got this too! Did you manage to get any output? (I did) I reported
> >> this
> >> > in another thread.
> >> >
> >> > This looks like a key error when StopWorker is called. As far as I
> >> know, it
> >> > seems like the work has been processed.
> >> >
> >> > On Tue, Sep 17, 2019 at 10:31 PM Tom Barber  wrote:
> >> >
> >> > > Well my errors are different but still terminal:
> >> > >
> >> > > ERROR:grpc._server:Exception calling application: u'1-1'
> >> > > Traceback (most recent call last):
> >> > > File "/usr/local/lib/python2.7/dist-packages/grpc/_server.py", line
> >> 434,
> >> > > in _call_behavior
> >> > > response_or_iterator = behavior(argument, context)
> >> > > File
> >> > >
> >> "/usr/local/lib/python2.7/dist-packages/apache_beam-2.17.0.dev0-py2.7.egg/apache_beam/runners/worker/worker_pool_main.py",
> >> > > line 126, in StopWorker
> >> > > worker_process =
> >> > > se

Re: Word-count example

2019-09-17 Thread Benjamin Tan
Could you try adding "--experiments=beam_fn_api" to the runner options? Kyle 
did mention that it should be fixed in 2.16.0.

On 2019/09/17 16:40:41, Matthew Patterson  wrote: 
> Folks,
> 
> Am sure I am doing it wrong, but have been beating head against wall long 
> enough I need to ask for help.
> 
> My goal is to use the python SDK to run jobs on AWS flink cluster, but very 
> little is working, so I am trying to back down to the very simplest thing 
> that works, and build back up: not going so well.
> 
> Now I am running, or trying to, on a minikube link cluster. Now running from 
> just java/mvn, so I go through the following:
> 
>   1.  bring-up-cluster (see below)
>   2.  grab-the-code (see below)
>   3.  maven-submit (see below)
> 
> Eventual failure is
> 
> “[ERROR] Failed to execute goal 
> org.codehaus.mojo:exec-maven-plugin:1.6.0:java (default-cli) on project 
> word-count-beam: An exception occured while executing the Java class. 
> Pipeline execution failed: Could not retrieve the execution result. (JobID: 
> 6edad04faa43fbf22bdea24171ba9d59): Failed to submit JobGraph. [Failed to 
> deserialize JobGraph.]”
> 
> and log is attached (I hope)
> 
> Other details:
> 
> on osx
> 
> 
> 
> ➜ minikube version
> 
> minikube version: v1.3.1
> 
> commit: ca60a424ce69a4d79f502650199ca2b52f29e631
> 
> 
> ➜ kubectl version
> 
> Client Version: version.Info{Major:"1", Minor:"14", GitVersion:"v1.14.6", 
> GitCommit:"96fac5cd13a5dc064f7d9f4f23030a6aeface6cc", GitTreeState:"clean", 
> BuildDate:"2019-08-19T11:13:49Z", GoVersion:"go1.12.9", Compiler:"gc", 
> Platform:"darwin/amd64"}
> 
> Server Version: version.Info{Major:"1", Minor:"15", GitVersion:"v1.15.2", 
> GitCommit:"f6278300bebbb750328ac16ee6dd3aa7d3549568", GitTreeState:"clean", 
> BuildDate:"2019-08-05T09:15:22Z", GoVersion:"go1.12.5", Compiler:"gc", 
> Platform:"linux/amd64"}
> 
> 
> ➜ flink --version
> 
> Version: 1.8.1, Commit ID: 7297bac
> 
> ➜ python -c"import apache_beam; print(apache_beam.__version__)"
> 2.15.0
> 
> ➜ curl localhost:8081
> 
> 
> 
> 
> 
>   
>   
>   
>   Apache Flink Web Dashboard
>href="assets/favicon/apple-touch-icon.png">
>sizes="32x32">
>sizes="16x16">
>   
>color="#aa1919">
>   
>content="assets/favicon/browserconfig.xml">
>   
>href="styles.30d0912c1ece284d8d9a.css">
> 
>   
>  src="runtime.ebbfed06bb8e06aaa889.js"> src="es2015-polyfills.5e343224e81eefb7658e.js" nomodule> type="text/javascript" 
> src="polyfills.b37850e8279bc3caafc9.js"> type="text/javascript" src="main.dc6e0997d092df051db0.js">
> 
> 
> Thanks for any ideas,
> Matt
> 
> >>> bring-up-cluster
> 
> # [yamls from 
> here](https://ci.apache.org/projects/flink/flink-docs-stable/ops/deployment/kubernetes.html)
> minikube start
> minikube ssh 'sudo ip link set docker0 promisc on'
> 
> kubectl create -f flink-configuration-configmap.yaml
> kubectl create -f jobmanager-service.yaml
> kubectl create -f jobmanager-deployment.yaml
> kubectl create -f taskmanager-deployment.yaml
> kubectl apply -f jobmanager-rest-service.yaml
> 
> >>> grab-the-code
> mvn archetype:generate \
> -DarchetypeGroupId=org.apache.beam \
> -DarchetypeArtifactId=beam-sdks-java-maven-archetypes-examples \
> -DarchetypeVersion=2.15.0 \
> -DgroupId=org.example \
> -DartifactId=word-count-beam \
> -Dversion="0.1" \
> -Dpackage=org.apache.beam.examples \
> -DinteractiveMode=false
> 
> >>> maven-submit
> 
> # kubectl port-forward svc/flink-jobmanager-rest 8081:8081 # in separate 
> shell, as blocking
> mvn package exec:java -X 
> -Dexec.mainClass=org.apache.beam.examples.DebuggingWordCount \
>  -Dexec.args="--runner=FlinkRunner\
>   --flinkMaster="localhost:8081"\
>   --filesToStage=target/word-count-beam-bundled-0.1.jar\
>   --output=" -Pflink-runner
> 
> 
> 


Re: Word-count example

2019-09-17 Thread Benjamin Tan
Ah. So maybe try 2.16.0? 

On 2019/09/17 23:47:16, Kyle Weaver  wrote: 
> --experiments=beam_fn_api doesn't apply here, as this is a Java pipeline
> using the non-portable version of the Flink runner.
> 
> Kyle Weaver | Software Engineer | github.com/ibzib | kcwea...@google.com
> 
> 
> On Tue, Sep 17, 2019 at 4:41 PM Benjamin Tan 
> wrote:
> 
> > Could you try adding "--experiments=beam_fn_api" to the runner options?
> > Kyle did mention that it should be fixed in 2.16.0.
> >
> > On 2019/09/17 16:40:41, Matthew Patterson 
> > wrote:
> > > Folks,
> > >
> > > Am sure I am doing it wrong, but have been beating head against wall
> > long enough I need to ask for help.
> > >
> > > My goal is to use the python SDK to run jobs on AWS flink cluster, but
> > very little is working, so I am trying to back down to the very simplest
> > thing that works, and build back up: not going so well.
> > >
> > > Now I am running, or trying to, on a minikube link cluster. Now running
> > from just java/mvn, so I go through the following:
> > >
> > >   1.  bring-up-cluster (see below)
> > >   2.  grab-the-code (see below)
> > >   3.  maven-submit (see below)
> > >
> > > Eventual failure is
> > >
> > > “[ERROR] Failed to execute goal
> > org.codehaus.mojo:exec-maven-plugin:1.6.0:java (default-cli) on project
> > word-count-beam: An exception occured while executing the Java class.
> > Pipeline execution failed: Could not retrieve the execution result. (JobID:
> > 6edad04faa43fbf22bdea24171ba9d59): Failed to submit JobGraph. [Failed to
> > deserialize JobGraph.]”
> > >
> > > and log is attached (I hope)
> > >
> > > Other details:
> > >
> > > on osx
> > >
> > >
> > >
> > > ➜ minikube version
> > >
> > > minikube version: v1.3.1
> > >
> > > commit: ca60a424ce69a4d79f502650199ca2b52f29e631
> > >
> > >
> > > ➜ kubectl version
> > >
> > > Client Version: version.Info{Major:"1", Minor:"14",
> > GitVersion:"v1.14.6", GitCommit:"96fac5cd13a5dc064f7d9f4f23030a6aeface6cc",
> > GitTreeState:"clean", BuildDate:"2019-08-19T11:13:49Z",
> > GoVersion:"go1.12.9", Compiler:"gc", Platform:"darwin/amd64"}
> > >
> > > Server Version: version.Info{Major:"1", Minor:"15",
> > GitVersion:"v1.15.2", GitCommit:"f6278300bebbb750328ac16ee6dd3aa7d3549568",
> > GitTreeState:"clean", BuildDate:"2019-08-05T09:15:22Z",
> > GoVersion:"go1.12.5", Compiler:"gc", Platform:"linux/amd64"}
> > >
> > >
> > > ➜ flink --version
> > >
> > > Version: 1.8.1, Commit ID: 7297bac
> > >
> > > ➜ python -c"import apache_beam; print(apache_beam.__version__)"
> > > 2.15.0
> > >
> > > ➜ curl localhost:8081
> > > 
> > >
> > > 
> > > 
> > > 
> > >   
> > >   
> > >   
> > >   Apache Flink Web Dashboard
> > >> href="assets/favicon/apple-touch-icon.png">
> > >> href="assets/favicon/favicon-32x32.png" sizes="32x32">
> > >> href="assets/favicon/favicon-16x16.png" sizes="16x16">
> > >   
> > >> color="#aa1919">
> > >   
> > >> content="assets/favicon/browserconfig.xml">
> > >   
> > >> href="styles.30d0912c1ece284d8d9a.css">
> > > 
> > >   
> > >  > src="runtime.ebbfed06bb8e06aaa889.js"> > type="text/javascript" src="es2015-polyfills.5e343224e81eefb7658e.js"
> > nomodule> > src="polyfills.b37850e8279bc3caafc9.js"> > type="text/javascript" src="main.dc6e0997d092df051db0.js">
> > > 
> > >
> > > Thanks for any ideas,
> > > Matt
> > >
> > > >>> bring-up-cluster
> > >
> > > # [yamls from here](
> > https://ci.apache.org/projects/flink/flink-docs-stable/ops/deployment/kubernetes.html
> > )
> > > minikube start
> > > minikube ssh 'sudo ip link set docker0 promisc on'
> > >
> > > kubectl create -f flink-configuration-configmap.yaml
> > > kubectl create -f jobmanager-service.yaml
> > > kubectl create -f jobmanager-deployment.yaml
> > > kubectl create -f taskmanager-deployment.yaml
> > > kubectl apply -f jobmanager-rest-service.yaml
> > >
> > > >>> grab-the-code
> > > mvn archetype:generate \
> > > -DarchetypeGroupId=org.apache.beam \
> > > -DarchetypeArtifactId=beam-sdks-java-maven-archetypes-examples \
> > > -DarchetypeVersion=2.15.0 \
> > > -DgroupId=org.example \
> > > -DartifactId=word-count-beam \
> > > -Dversion="0.1" \
> > > -Dpackage=org.apache.beam.examples \
> > > -DinteractiveMode=false
> > >
> > > >>> maven-submit
> > >
> > > # kubectl port-forward svc/flink-jobmanager-rest 8081:8081 # in separate
> > shell, as blocking
> > > mvn package exec:java -X
> > -Dexec.mainClass=org.apache.beam.examples.DebuggingWordCount \
> > >  -Dexec.args="--runner=FlinkRunner\
> > >   --flinkMaster="localhost:8081"\
> > >   --filesToStage=target/word-count-beam-bundled-0.1.jar\
> > >   --output=" -Pflink-runner
> > >
> > >
> > >
> >
> 


Does loopback mode work on Spark clusters?

2019-09-17 Thread Benjamin Tan
I'm having connection refused errors, though code on PySpark works on the 
clusters so I'm pretty sure it's not a firewall issue.

So, does loopback mode work on a Spark cluster?


Re: Submitting job to AWS EMR fails with error=2, No such file or directory

2019-09-18 Thread Benjamin Tan
Seems like docker is not installed. Maybe run with PROCESS as the environment 
type ? Or install docker. 

Sent from my iPhone

> On 18 Sep 2019, at 6:40 PM, Yu Watanabe  wrote:
> 
> Hello.
> 
> I am trying to run FlinkRunner (2.15.0) on AWS EC2 instance and submit job to 
> AWS EMR (5.26.0).
> 
> However, I get below error when I run the pipeline and fail.
> 
> -
> Caused by: java.lang.Exception: The user defined 'open()' method caused an 
> exception: java.io.IOException: Cannot run program "docker": error=2, No such 
> file or directory
> at 
> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:498)
> at 
> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
> ... 1 more
> Caused by: 
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.UncheckedExecutionException:
>  java.io.IOException: Cannot run program "docker": error=2, No such file or 
> directory
> at 
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$LocalLoadingCache.getUnchecked(LocalCache.java:4966)
> at 
> org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$SimpleStageBundleFactory.(DefaultJobBundleFactory.java:211)
> at 
> org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$SimpleStageBundleFactory.(DefaultJobBundleFactory.java:202)
> at 
> org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory.forStage(DefaultJobBundleFactory.java:185)
> at 
> org.apache.beam.runners.flink.translation.functions.FlinkDefaultExecutableStageContext.getStageBundleFactory(FlinkDefaultExecutableStageContext.java:49)
> at 
> org.apache.beam.runners.flink.translation.functions.ReferenceCountingFlinkExecutableStageContextFactory$WrappedContext.getStageBundleFactory(ReferenceCountingFlinkExecutableStageContextFactory.java:203)
> at 
> org.apache.beam.runners.flink.translation.functions.FlinkExecutableStageFunction.open(FlinkExecutableStageFunction.java:129)
> at 
> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
> at 
> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:494)
> ... 3 more
> Caused by: java.io.IOException: Cannot run program "docker": error=2, No such 
> file or directory
> at java.lang.ProcessBuilder.start(ProcessBuilder.java:1048)
> at 
> org.apache.beam.runners.fnexecution.environment.DockerCommand.runShortCommand(DockerCommand.java:141)
> at 
> org.apache.beam.runners.fnexecution.environment.DockerCommand.runImage(DockerCommand.java:92)
> at 
> org.apache.beam.runners.fnexecution.environment.DockerEnvironmentFactory.createEnvironment(DockerEnvironmentFactory.java:152)
> at 
> org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$1.load(DefaultJobBundleFactory.java:178)
> at 
> org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$1.load(DefaultJobBundleFactory.java:162)
> at 
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3528)
> at 
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2277)
> at 
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2154)
> at 
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2044)
> at 
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache.get(LocalCache.java:3952)
> at 
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache.getOrLoad(LocalCache.java:3974)
> at 
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4958)
> at 
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$LocalLoadingCache.getUnchecked(LocalCache.java:4964)
> ... 11 more
> Caused by: java.io.IOException: error=2, No such file or directory
> at java.lang.UNIXProcess.forkAndExec(Native Method)
> at java.lang.UNIXProcess.(UNIXProcess.java:247)
> at java.lang.ProcessImpl.start(ProcessImpl.java:134)
> at java.lang.ProcessBuilder.start(ProcessBuilder.java:1029)
> ... 24 more
>  -  
> 
> Pipeline options are below.
>  -  
> options = PipelineOptions([
>   "--runner=FlinkRunner",
>   "--flink_version=1.8",
>   
> "--flink_master_url=ip-172-31-1-84.ap-northeast-1.compute.internal:43581",
> 

Re: Submitting job to AWS EMR fails with error=2, No such file or directory

2019-09-18 Thread Benjamin Tan
e$Segment.get(LocalCache.java:2044)
> at 
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache.get(LocalCache.java:3952)
> at 
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache.getOrLoad(LocalCache.java:3974)
> at 
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4958)
> at 
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$LocalLoadingCache.getUnchecked(LocalCache.java:4964)
> ... 11 more
> Suppressed: java.io.IOException: Received exit code 1 for command 
> 'docker kill 
> 3e7e0d2d9d8362995d4b566ce1611834d56c5ca550ae89ae698a279271e4c33b'. stderr: 
> Error response from daemon: Cannot kill container: 
> 3e7e0d2d9d8362995d4b566ce1611834d56c5ca550ae89ae698a279271e4c33b: No such 
> container: 3e7e0d2d9d8362995d4b566ce1611834d56c5ca550ae89ae698a279271e4c33b
> at 
> org.apache.beam.runners.fnexecution.environment.DockerCommand.runShortCommand(DockerCommand.java:179)
> at 
> org.apache.beam.runners.fnexecution.environment.DockerCommand.killContainer(DockerCommand.java:134)
> at 
> org.apache.beam.runners.fnexecution.environment.DockerEnvironmentFactory.createEnvironment(DockerEnvironmentFactory.java:174)
> ... 21 more
>   
> =
>   
> 
> Next , I'd like to try using environment type 'PROCESS' but it seems you need 
> external command for 'environment_config'
> 
> https://github.com/apache/beam/blob/7931ec055e2da7214c82e368ef7d7fd679faaef1/sdks/python/apache_beam/runners/portability/portable_runner.py#L138
> 
>   config = json.loads(portable_options.environment_config)
> 
> May I ask what command I need to set as shell script for each task managers ?
> 
> Best Regards,
> Yu Watanabe
> 
>> On Wed, Sep 18, 2019 at 9:39 PM Benjamin Tan  
>> wrote:
>> Seems like docker is not installed. Maybe run with PROCESS as the 
>> environment type ? Or install docker. 
>> 
>> Sent from my iPhone
>> 
>>> On 18 Sep 2019, at 6:40 PM, Yu Watanabe  wrote:
>>> 
>>> Hello.
>>> 
>>> I am trying to run FlinkRunner (2.15.0) on AWS EC2 instance and submit job 
>>> to AWS EMR (5.26.0).
>>> 
>>> However, I get below error when I run the pipeline and fail.
>>> 
>>> -
>>> Caused by: java.lang.Exception: The user defined 'open()' method caused an 
>>> exception: java.io.IOException: Cannot run program "docker": error=2, No 
>>> such file or directory
>>> at 
>>> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:498)
>>> at 
>>> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368)
>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
>>> ... 1 more
>>> Caused by: 
>>> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.UncheckedExecutionException:
>>>  java.io.IOException: Cannot run program "docker": error=2, No such file or 
>>> directory
>>> at 
>>> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$LocalLoadingCache.getUnchecked(LocalCache.java:4966)
>>> at 
>>> org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$SimpleStageBundleFactory.(DefaultJobBundleFactory.java:211)
>>> at 
>>> org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$SimpleStageBundleFactory.(DefaultJobBundleFactory.java:202)
>>> at 
>>> org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory.forStage(DefaultJobBundleFactory.java:185)
>>> at 
>>> org.apache.beam.runners.flink.translation.functions.FlinkDefaultExecutableStageContext.getStageBundleFactory(FlinkDefaultExecutableStageContext.java:49)
>>> at 
>>> org.apache.beam.runners.flink.translation.functions.ReferenceCountingFlinkExecutableStageContextFactory$WrappedContext.getStageBundleFactory(ReferenceCountingFlinkExecutableStageContextFactory.java:203)
>>> at 
>>> org.apache.beam.runners.flink.translation.functions.FlinkExecutableStageFunction.open(FlinkExecutableStageFunction.java:129)
>>> at 
>>> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
>>> at 
>>> org.apache.flink.runtime.operators.BatchTask.ru

Re: Submitting job to AWS EMR fails with error=2, No such file or directory

2019-09-18 Thread Benjamin Tan
Try this as part of PipelineOptions:

--environment_config={\"command\":\"/opt/apache/beam/boot\"}

On 2019/09/18 10:40:42, Yu Watanabe  wrote: 
> Hello.
> 
> I am trying to run FlinkRunner (2.15.0) on AWS EC2 instance and submit job
> to AWS EMR (5.26.0).
> 
> However, I get below error when I run the pipeline and fail.
> 
> -
> Caused by: java.lang.Exception: The user defined 'open()' method caused an
> exception: java.io.IOException: Cannot run program "docker": error=2, No
> such file or directory
> at
> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:498)
> at
> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
> ... 1 more
> Caused by:
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.UncheckedExecutionException:
> java.io.IOException: Cannot run program "docker": error=2, No such file or
> directory
> at
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$LocalLoadingCache.getUnchecked(LocalCache.java:4966)
> at
> org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$SimpleStageBundleFactory.(DefaultJobBundleFactory.java:211)
> at
> org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$SimpleStageBundleFactory.(DefaultJobBundleFactory.java:202)
> at
> org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory.forStage(DefaultJobBundleFactory.java:185)
> at
> org.apache.beam.runners.flink.translation.functions.FlinkDefaultExecutableStageContext.getStageBundleFactory(FlinkDefaultExecutableStageContext.java:49)
> at
> org.apache.beam.runners.flink.translation.functions.ReferenceCountingFlinkExecutableStageContextFactory$WrappedContext.getStageBundleFactory(ReferenceCountingFlinkExecutableStageContextFactory.java:203)
> at
> org.apache.beam.runners.flink.translation.functions.FlinkExecutableStageFunction.open(FlinkExecutableStageFunction.java:129)
> at
> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
> at
> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:494)
> ... 3 more
> Caused by: java.io.IOException: Cannot run program "docker": error=2, No
> such file or directory
> at java.lang.ProcessBuilder.start(ProcessBuilder.java:1048)
> at
> org.apache.beam.runners.fnexecution.environment.DockerCommand.runShortCommand(DockerCommand.java:141)
> at
> org.apache.beam.runners.fnexecution.environment.DockerCommand.runImage(DockerCommand.java:92)
> at
> org.apache.beam.runners.fnexecution.environment.DockerEnvironmentFactory.createEnvironment(DockerEnvironmentFactory.java:152)
> at
> org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$1.load(DefaultJobBundleFactory.java:178)
> at
> org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$1.load(DefaultJobBundleFactory.java:162)
> at
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3528)
> at
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2277)
> at
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2154)
> at
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2044)
> at
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache.get(LocalCache.java:3952)
> at
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache.getOrLoad(LocalCache.java:3974)
> at
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4958)
> at
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$LocalLoadingCache.getUnchecked(LocalCache.java:4964)
> ... 11 more
> Caused by: java.io.IOException: error=2, No such file or directory
> at java.lang.UNIXProcess.forkAndExec(Native Method)
> at java.lang.UNIXProcess.(UNIXProcess.java:247)
> at java.lang.ProcessImpl.start(ProcessImpl.java:134)
> at java.lang.ProcessBuilder.start(ProcessBuilder.java:1029)
> ... 24 more
>  -
> 
> Pipeline options are below.
>  -
> options = PipelineOptions([
>   "--runner=FlinkRunner",
>   "--flink_version=1.8",
> 
> "--flink_master_url=ip-172-31-1-84.ap-northeast-1.compute.internal:43581",
>   "--environment_config=
> asia.gcr.io/PROJECTNAME/beam/python3",
>  

Where is /opt/apache/beam/boot?

2019-09-18 Thread Benjamin Tan
I'm trying to use the process environment_config and I have no idea where is 
/opt/apache/beam/boot. 

Is there a way to generate this?


Re: Where is /opt/apache/beam/boot?

2019-09-18 Thread Benjamin Tan
You are awesome. Thanks! 

On 2019/09/18 15:08:08, Lukasz Cwik  wrote: 
> It is embedded inside the docker container that corresponds to which SDK
> your using.
> 
> Python container boot src:
> https://github.com/apache/beam/blob/master/sdks/python/container/boot.go
> Java container boot src:
> https://github.com/apache/beam/blob/master/sdks/java/container/boot.go
> Go container boot src:
> https://github.com/apache/beam/blob/master/sdks/go/container/boot.go
> 
> On Wed, Sep 18, 2019 at 7:38 AM Benjamin Tan 
> wrote:
> 
> > I'm trying to use the process environment_config and I have no idea where
> > is /opt/apache/beam/boot.
> >
> > Is there a way to generate this?
> >
> 


Re: Submitting job to AWS EMR fails with error=2, No such file or directory

2019-09-20 Thread Benjamin Tan
ess to use docker. I would suggest checking if the yarn user on 
>> TaskManagers have permission to use docker.
>> 
>> 
>>> On Wed, Sep 18, 2019 at 11:23 AM Kyle Weaver  wrote:
>>> > Per your suggest, I read the design sheet and  it states that harness 
>>> > container is a mandatory settings for  all TaskManger.
>>> 
>>> That doc is out of date. As Benjamin said, it's not strictly required any 
>>> more to use Docker. However, it is still recommended, as Docker makes 
>>> managing dependencies a lot easier, whereas PROCESS mode involves managing 
>>> dependencies via shell scripts.
>>> 
>>> > Caused by: java.lang.Exception: The user defined 'open()' method caused 
>>> > an exception: java.io.IOException: Received exit code 1 for command 
>>> > 'docker inspect -f {{.State.Running}} 
>>> > 3e7e0d2d9d8362995d4b566ce1611834d56c5ca550ae89ae698a279271e4c33b'. 
>>> > stderr: Error: No such object: 
>>> > 3e7e0d2d9d8362995d4b566ce1611834d56c5ca550ae89ae698a279271e4c33b
>>> 
>>> This means your Docker container is failing to start up for some reason. I 
>>> recommend either a) running the container manually and inspecting the logs, 
>>> or b) you can use the master or Beam 2.16 branches, which have better 
>>> Docker logging (https://github.com/apache/beam/pull/9389).
>>> 
>>> Kyle Weaver | Software Engineer | github.com/ibzib | kcwea...@google.com
>>> 
>>> 
>>>> On Wed, Sep 18, 2019 at 8:04 AM Yu Watanabe  wrote:
>>>> Thank you for the reply.
>>>> 
>>>> I see files "boot" under below directories. 
>>>> But these seems to be used for containers.
>>>> 
>>>>   (python) admin@ip-172-31-9-89:~/beam-release-2.15.0$ find ./ -name 
>>>> "boot" -exec ls -l {} \;
>>>> lrwxrwxrwx 1 admin admin 23 Sep 16 23:43 
>>>> ./sdks/python/container/.gogradle/project_gopath/src/github.com/apache/beam/sdks/python/boot
>>>>  -> ../../../../../../../..
>>>> -rwxr-xr-x 1 admin admin 16543786 Sep 16 23:48 
>>>> ./sdks/python/container/build/target/launcher/linux_amd64/boot
>>>> -rwxr-xr-x 1 admin admin 16358928 Sep 16 23:48 
>>>> ./sdks/python/container/build/target/launcher/darwin_amd64/boot
>>>> -rwxr-xr-x 1 admin admin 16543786 Sep 16 23:48 
>>>> ./sdks/python/container/py3/build/docker/target/linux_amd64/boot
>>>> -rwxr-xr-x 1 admin admin 16358928 Sep 16 23:48 
>>>> ./sdks/python/container/py3/build/docker/target/darwin_amd64/boot
>>>> -rwxr-xr-x 1 admin admin 16543786 Sep 16 23:48 
>>>> ./sdks/python/container/py3/build/target/linux_amd64/boot
>>>> -rwxr-xr-x 1 admin admin 16358928 Sep 16 23:48 
>>>> ./sdks/python/container/py3/build/target/darwin_amd64/boot  
>>>> 
>>>>> On Wed, Sep 18, 2019 at 11:37 PM Benjamin Tan 
>>>>>  wrote:
>>>> 
>>>>> Try this as part of PipelineOptions:
>>>>> 
>>>>> --environment_config={\"command\":\"/opt/apache/beam/boot\"}
>>>>> 
>>>>> On 2019/09/18 10:40:42, Yu Watanabe  wrote: 
>>>>> > Hello.
>>>>> > 
>>>>> > I am trying to run FlinkRunner (2.15.0) on AWS EC2 instance and submit 
>>>>> > job
>>>>> > to AWS EMR (5.26.0).
>>>>> > 
>>>>> > However, I get below error when I run the pipeline and fail.
>>>>> > 
>>>>> > -
>>>>> > Caused by: java.lang.Exception: The user defined 'open()' method caused 
>>>>> > an
>>>>> > exception: java.io.IOException: Cannot run program "docker": error=2, No
>>>>> > such file or directory
>>>>> > at
>>>>> > org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:498)
>>>>> > at
>>>>> > org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368)
>>>>> > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
>>>>> > ... 1 more
>>>>> > Caused by:
>>>>> > org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.UncheckedExecutionException:
>>>>> > java.io.IOException: Cannot run program "docker": error=2, No such f

[Bug] ReadFromKafka not streaming properly on FlinkRunner in Python

2022-03-14 Thread Benjamin Tan
I've noticed some really interesting and surprising behavior with
ReadFromKafka in Python.

I'm working with a simple Apache Beam pipeline consisting of reading from
an unbounded Kafka topic and printing the values out. I have two flavors of
this. This is done via the Flink Runner.

Version 1

  with beam.Pipeline(options=beam_options) as p:
(p
 | "Read from Kafka topic" >> ReadFromKafka(
consumer_config=consumer_config,
topics=[producer_topic])
 | 'log' >> beam.ParDo(LogData())

This one uses from apache_beam.io.kafka import ReadFromKafka (i.e. the
default implementation that comes with Apache Beam).

*Version 2*

   with beam.Pipeline(options=beam_options) as p:
(p
 | "Read from Kafka topic (KafkaConsumer)" >> KafkaConsume(
consumer_config={
"topic": producer_topic,
'auto_offset_reset': 'earliest',
"group_id": 'transaction_classification',
"bootstrap_servers": servers,
})

This one is using Beam nuggets:

from beam_nuggets.io.kafkaio import KafkaConsume

I have configured the Kafka producer to produce an element every 1 second.

What I've observed is that when I consume from ReadFromKafka (version 1),
the elements get produced around 4-6 seconds apart, and are batched
together.

On the other hand, if I tried the same thing with KafkaConsume (version 2),
then I get elements as they are produced (i.e. every second), which is
exactly the behavior I expected.

I have tried to make the consumer_config to be the same for both, but it
doesn't seem to have any effect on version 1.

Now, I would like to stick to version 1 because that gives me proper
metrics in the Flink UI, while version 2 works better, I don't get any
metrics in Flink (everything is reported as 0 bytes received / 0 records
received).

I don't understand why ReadFromKafka seems to be batching a few records
together before it gets pushed down the pipeline. Does anyone have any
ideas? This behavior doesn't exhibit itself on the DataFlow runner though.
Is there any setting that I can try? Otherwise, how are folks dealing with
reading from Kafka for unbounded streams?