Colab vs Local IDE
Hi Team, Is there any difference in running the spark or Flink runners from Colab vs Local. The code runs with no issues in Google Colab environment but it does not run on my local environment. This is for windows. Steps: 1. Start Flink or Spark on local machine 2. Make sure Spark and Flink runs on local machine 3. If Spark - start docker like this -- docker run -p 8099:8099 -p 8098:8098 -p 8097:8097 apache/beam_spark_job_server:latest --spark-master-url=spark://localhost:7077 4. python "filename.py" should run but getting raise grpc.FutureTimeoutError() Parameters as follows SPARK: options = PipelineOptions([ "--runner=PortableRunner", "--job_endpoint=localhost:8099", "--environment_type=LOOPBACK" ]) FLINK: options = PipelineOptions([ "--runner=FlinkRunner", "--flink_version=1.8", "--flink_master=localhost:8081", "--environment_type=LOOPBACK" ])
Re: KafkaIO memory issue
Hi Alex, Thanks for sharing this! I think my problem is I did not preserve enough memory for JVM non-heap usage, and by default Flink set the xms and xmx to be the same and I allocate almost all the memory for heap. After add more memory, the memory usage seems stabilized. We do use global window for other beam pipelines along with sideInput, I’m not sure sliding window would work for us. Is there a ticket that Beam community is working on to fix it? Thanks a lot Eleanore On Wed, Oct 28, 2020 at 10:20 Alexey Romanenko wrote: > I don’t think it’s a KafkaIO issue since checkpoints are handled by > runner. > > > > Could it be similar to this issue? > > > https://lists.apache.org/thread.html/r4a454a40197f2a59280ffeccfe44837ec072237aea56d50599f12184%40%3Cuser.beam.apache.org%3E > > > > Could you try a workaround with sliding windows proposed there? > > > > > > > On 22 Oct 2020, at 05:18, Eleanore Jin wrote: > > > > > > Hi all, > > > > > > I am using beam 2.23 (java), and flink 1.10.2, my pipeline is quite > simple read from a kafka topic and write to another kafka topic. > > > > > > When I enabled checkpoint, I see the memory usage of the flink job > manager keeps on growing > > > > > > > > > The Flink cluster is running on kubernetes, with 1 job manager, and 12 > task managers each with 4 slots, kafka input topic has 96 partitions. The > checkpoint is stored in azure blob storage. > > > > > > Checkpoint happens every 3 seconds, with timeout 10 seconds, with > minimum pause of 1 second. > > > > > > Any ideas why this happens? > > > Thanks a lot! > > > Eleanore > > > >
Re: Colab vs Local IDE
> Is there any difference in running the spark or Flink runners from Colab vs Local. Google Colab is hosted in a Linux virtual machine. Docker for Windows is missing some features, including host networking. > 4. python "filename.py" should run but getting raise grpc.FutureTimeoutError() Can you provide the stack trace? On Wed, Oct 28, 2020 at 4:34 PM Ramesh Mathikumar wrote: > Hi Team, > > Is there any difference in running the spark or Flink runners from Colab > vs Local. The code runs with no issues in Google Colab environment but it > does not run on my local environment. > > This is for windows. > > Steps: > > 1. Start Flink or Spark on local machine > 2. Make sure Spark and Flink runs on local machine > 3. If Spark - start docker like this -- docker run -p 8099:8099 -p > 8098:8098 -p 8097:8097 apache/beam_spark_job_server:latest > --spark-master-url=spark://localhost:7077 > 4. python "filename.py" should run but getting raise > grpc.FutureTimeoutError() > > > Parameters as follows > > SPARK: > options = PipelineOptions([ > "--runner=PortableRunner", > "--job_endpoint=localhost:8099", > "--environment_type=LOOPBACK" > ]) > > FLINK: > options = PipelineOptions([ > "--runner=FlinkRunner", > "--flink_version=1.8", > "--flink_master=localhost:8081", > "--environment_type=LOOPBACK" > ]) >
Re: Spark Portable Runner + Docker
Hi Alex -- Please se the details you are looking for. I am running a sample pipeline and my environment is this. python "SaiStudy - Apache-Beam-Spark.py" --runner=PortableRunner --job_endpoint=192.168.99.102:8099 My Spark is running on a Docker Container and I can see that the JobService is running at 8099. I am getting the following error: grpc._channel._MultiThreadedRendezvous: <_MultiThreadedRendezvous of RPC that terminated with: status = StatusCode.UNAVAILABLE details = "failed to connect to all addresses" debug_error_string = "{"created":"@1603539936.53600","description":"Failed to pick subchannel","file":"src/core/ext/filters/client_channel/client_chann el.cc","file_line":4090,"referenced_errors":[{"created":"@1603539936.53600","description":"failed to connect to all addresses","file":"src/core/ext/filters/cli ent_channel/lb_policy/pick_first/pick_first.cc","file_line":394,"grpc_status":14}]}" When I curl to ip:port, I can see the following error from the docker logs Oct 24, 2020 11:34:50 AM org.apache.beam.vendor.grpc.v1p26p0.io.grpc.netty.NettyServerTransport notifyTerminated INFO: Transport failed org.apache.beam.vendor.grpc.v1p26p0.io.netty.handler.codec.http2.Http2Exception: Unexpected HTTP/1.x request: GET / at org.apache.beam.vendor.grpc.v1p26p0.io.netty.handler.codec.http2.Http2Exception.connectionError(Http2Exception.java:103) at org.apache.beam.vendor.grpc.v1p26p0.io.netty.handler.codec.http2.Http2ConnectionHandler$PrefaceDecoder.readClientPrefaceString(Http2ConnectionHandler.java:302) at org.apache.beam.vendor.grpc.v1p26p0.io.netty.handler.codec.http2.Http2ConnectionHandler$PrefaceDecoder.decode(Http2ConnectionHandler.java:239) at org.apache.beam.vendor.grpc.v1p26p0.io.netty.handler.codec.http2.Http2ConnectionHandler.decode(Http2ConnectionHandler.java:438) at org.apache.beam.vendor.grpc.v1p26p0.io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProt ection(ByteToMessageDecoder.java:505) at org.apache.beam.vendor.grpc.v1p26p0.io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:444) at org.apache.beam.vendor.grpc.v1p26p0.io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:283) at org.apache.beam.vendor.grpc.v1p26p0.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374) at org.apache.beam.vendor.grpc.v1p26p0.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360) at org.apache.beam.vendor.grpc.v1p26p0.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352) at org.apache.beam.vendor.grpc.v1p26p0.io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1422) at org.apache.beam.vendor.grpc.v1p26p0.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:37 4) at org.apache.beam.vendor.grpc.v1p26p0.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360) at org.apache.beam.vendor.grpc.v1p26p0.io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:931) at org.apache.beam.vendor.grpc.v1p26p0.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163) at org.apache.beam.vendor.grpc.v1p26p0.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:700) at org.apache.beam.vendor.grpc.v1p26p0.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:635) at org.apache.beam.vendor.grpc.v1p26p0.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:552) at org.apache.beam.vendor.grpc.v1p26p0.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:514) at org.apache.beam.vendor.grpc.v1p26p0.io.netty.util.concurrent.SingleThreadEventExecutor$6.run(SingleThreadEventExecutor.java:1044 ) at org.apache.beam.vendor.grpc.v1p26p0.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) at org.apache.beam.vendor.grpc.v1p26p0.io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) at java.lang.Thread.run(Thread.java:748) Help Please. On 2020/10/28 15:37:22, Alexey Romanenko wrote: > Hi Ramesh, > > By “+ Docker” do you mean Docker SDK Harness or running a Spark in Docker? > For the former I believe it works fine. > > Could you share more details of what kind of error you are facing? > > > On 27 Oct 2020, at 21:10, Ramesh Mathikumar wrote: > > > > Hi Group -- Has anyone got this to work? For me it does not either in the > > IDE or in Colab. Whats the community take on this one? > >
Re: Issues with python's external ReadFromPubSub
Yeah, I’m able to run that. apache_beam.io.ReadFromPubSub transform works just fine but only for DirectRunner in python. In flink we’re using the java implementation via an external transform apache_beam.io.external.gcp.pubsub.ReadFromPubSub. Is there a different way to do this? On Wed, Oct 28, 2020 at 10:47 AM Kyle Weaver wrote: > Are you able to run streaming word count on the same setup? > > On Tue, Oct 27, 2020 at 5:39 PM Sam Bourne wrote: > >> We updated from beam 2.18.0 to 2.24.0 and have been having issues using >> the python ReadFromPubSub external transform in flink 1.10. It seems >> like it starts up just fine, but it doesn’t consume any messages. >> >> I tried to reduce it to a simple example and tested back to beam 2.22.0 >> but have gotten the same results (of no messages being read). >> >> I have a hard time believing that it’s been broken for so many versions >> but I can’t seem to identify what I’ve done wrong. >> >> Steps to test: >> >> 1) Spin up the expansion service >> >> docker run -d --rm --network host apache/beam_flink1.10_job_server:2.24.0 >> >> 2) Create a simple pipeline using >> apache_beam.io.external.gcp.pubsub.ReadFromPubSub >> Here’s mine: >> https://gist.github.com/sambvfx/a8582f4805e468a97331b0eb13911ebf >> >> 3) Run the pipeline >> >> python -m pubsub_example --runner=FlinkRunner --save_main_session >> --flink_submit_uber_jar --environment_type=DOCKER >> --environment_config=apache/beam_python3.7_sdk:2.24.0 >> --checkpointing_interval=1 --streaming >> >> 4) Emit a few pubsub messages >> >> python -m pubsub_example --msg hello >> python -m pubsub_example --msg world >> >> What am I missing here? >> -Sam >> -- >> >> Some debugging things I’ve tried: >> >>- I can run ReadFromPubSub (the DirectRunner version just fine). >>- I confirmed that the gcloud credentials make it into the java sdk >>container that spins up. Without these you get credential type errors. >>- I modified the java DockerEnvironmentFactory to instead mount >>GOOGLE_APPLICATION_CREDENTIALS service account .json and set the env >>var. >>- I’ve tried a variety of different flink flags. >> >>
Re: Issues with python's external ReadFromPubSub
Are you able to run streaming word count on the same setup? On Tue, Oct 27, 2020 at 5:39 PM Sam Bourne wrote: > We updated from beam 2.18.0 to 2.24.0 and have been having issues using > the python ReadFromPubSub external transform in flink 1.10. It seems like > it starts up just fine, but it doesn’t consume any messages. > > I tried to reduce it to a simple example and tested back to beam 2.22.0 > but have gotten the same results (of no messages being read). > > I have a hard time believing that it’s been broken for so many versions > but I can’t seem to identify what I’ve done wrong. > > Steps to test: > > 1) Spin up the expansion service > > docker run -d --rm --network host apache/beam_flink1.10_job_server:2.24.0 > > 2) Create a simple pipeline using > apache_beam.io.external.gcp.pubsub.ReadFromPubSub > Here’s mine: > https://gist.github.com/sambvfx/a8582f4805e468a97331b0eb13911ebf > > 3) Run the pipeline > > python -m pubsub_example --runner=FlinkRunner --save_main_session > --flink_submit_uber_jar --environment_type=DOCKER > --environment_config=apache/beam_python3.7_sdk:2.24.0 > --checkpointing_interval=1 --streaming > > 4) Emit a few pubsub messages > > python -m pubsub_example --msg hello > python -m pubsub_example --msg world > > What am I missing here? > -Sam > -- > > Some debugging things I’ve tried: > >- I can run ReadFromPubSub (the DirectRunner version just fine). >- I confirmed that the gcloud credentials make it into the java sdk >container that spins up. Without these you get credential type errors. >- I modified the java DockerEnvironmentFactory to instead mount >GOOGLE_APPLICATION_CREDENTIALS service account .json and set the env >var. >- I’ve tried a variety of different flink flags. > >
Re: [DISCUSS] Update Kafka dependencies in Beam Java SDK
*raising this question, of course > On 28 Oct 2020, at 18:06, Alexey Romanenko wrote: > > tasing this question
Re: KafkaIO memory issue
I don’t think it’s a KafkaIO issue since checkpoints are handled by runner. Could it be similar to this issue? https://lists.apache.org/thread.html/r4a454a40197f2a59280ffeccfe44837ec072237aea56d50599f12184%40%3Cuser.beam.apache.org%3E Could you try a workaround with sliding windows proposed there? > On 22 Oct 2020, at 05:18, Eleanore Jin wrote: > > Hi all, > > I am using beam 2.23 (java), and flink 1.10.2, my pipeline is quite simple > read from a kafka topic and write to another kafka topic. > > When I enabled checkpoint, I see the memory usage of the flink job manager > keeps on growing > > > The Flink cluster is running on kubernetes, with 1 job manager, and 12 task > managers each with 4 slots, kafka input topic has 96 partitions. The > checkpoint is stored in azure blob storage. > > Checkpoint happens every 3 seconds, with timeout 10 seconds, with minimum > pause of 1 second. > > Any ideas why this happens? > Thanks a lot! > Eleanore
Re: [DISCUSS] Update Kafka dependencies in Beam Java SDK
Piotr, thank you for tasing this question. Let me ask some questions before. What will give us this dependencies update? What are the pros and cons? Can users use recent versions of Kafka client with current implementation based on ConsumerSpEL class? > On 22 Oct 2020, at 10:47, Piotr Szuberski wrote: > > Should we update Kafka dependencies to the recent ones (Kafka clients to > 2.6.0 and Kafka_2.11 to 2.4.1)? > > What would have to be done to keep the backwards compatibility and which > previous versions would we want to support? > > Kafka's backward compatibility is quite good so maybe there wouldn't be > anything to do? > > Let's vote/discuss.
Re: Spark Portable Runner + Docker
Hi Ramesh, By “+ Docker” do you mean Docker SDK Harness or running a Spark in Docker? For the former I believe it works fine. Could you share more details of what kind of error you are facing? > On 27 Oct 2020, at 21:10, Ramesh Mathikumar wrote: > > Hi Group -- Has anyone got this to work? For me it does not either in the IDE > or in Colab. Whats the community take on this one?
Re: Which Solr versions should be supported by Beam
So I think we can leave it as it is. The only problem would be if a user has a project using Beam and different Solr dependency at once - then Beam would enforce him to use the version Beam does. Should I change the dependency to 'provided' to cover this case? Earlier we had 5.5.2 version as compile dependency and there were no complains. On 2020/10/28 09:09:42, Piotr Szuberski wrote: > Response from Solr: > > Generally speaking, SolrJ has been very compatible communicating to many > backend Solr server versions. I wish we tracked problems about this > specifically somewhere, but I don't think we do. I suggest simply using the > latest SolrJ release. If you find issues, report them. Again, assuming > SolrJ, it's good to have some flexibility on which SolrClient subclass is > used. There's Cloud vs not (i.e. standalone), there's newer HTTP2 vs not. > There's Cloud talking directly to ZooKeeper for cluster state, or there's via > Solr HTTP. > > On 2020/10/23 20:14:22, Kenneth Knowles wrote: > > This might be a good question for u...@solr.apache.org and/or > > d...@solr.apache.org, too. > > > > Kenn > > > > On Fri, Oct 23, 2020 at 6:24 AM Piotr Szuberski > > > > wrote: > > > > > Beam has quite old Solr dependency (5.x.y) which has been deprecated for a > > > long time. > > > > > > Solr dependency has recently been updated to 8.6.y, but there is a > > > question which versions should be supported? > > > > > > Are there users using versions older than 7.x.y? > > > > > >
Re: Which Solr versions should be supported by Beam
Response from Solr: Generally speaking, SolrJ has been very compatible communicating to many backend Solr server versions. I wish we tracked problems about this specifically somewhere, but I don't think we do. I suggest simply using the latest SolrJ release. If you find issues, report them. Again, assuming SolrJ, it's good to have some flexibility on which SolrClient subclass is used. There's Cloud vs not (i.e. standalone), there's newer HTTP2 vs not. There's Cloud talking directly to ZooKeeper for cluster state, or there's via Solr HTTP. On 2020/10/23 20:14:22, Kenneth Knowles wrote: > This might be a good question for u...@solr.apache.org and/or > d...@solr.apache.org, too. > > Kenn > > On Fri, Oct 23, 2020 at 6:24 AM Piotr Szuberski > wrote: > > > Beam has quite old Solr dependency (5.x.y) which has been deprecated for a > > long time. > > > > Solr dependency has recently been updated to 8.6.y, but there is a > > question which versions should be supported? > > > > Are there users using versions older than 7.x.y? > > >