I am not using the python local runner but the flink runner. A flink
cluster is started locally.

On Thu, 7 Mar 2024 at 12:13, Robert Bradshaw via user <[email protected]>
wrote:

> Streaming portable pipelines are not yet supported on the Python local
> runner.
>
> On Wed, Mar 6, 2024 at 5:03 PM Jaehyeon Kim <[email protected]> wrote:
> >
> > Hello,
> >
> > I use the python SDK and my pipeline reads messages from Kafka and
> transforms via SQL. I see two containers are created but it seems that they
> don't communicate with each other so that the Flink task manager keeps
> killing the containers. The Flink cluster runs locally. Is there a way to
> run two multi-language pipelines (running on Docker) communicating with
> each other?
> >
> > Cheers,
> > Jaehyeon
> >
> >
> >
> > def run():
> >     parser = argparse.ArgumentParser(
> >         description="Process statistics by user from website visit event"
> >     )
> >     parser.add_argument(
> >         "--inputs",
> >         default="inputs",
> >         help="Specify folder name that event records are saved",
> >     )
> >     parser.add_argument(
> >         "--runner", default="FlinkRunner", help="Specify Apache Beam
> Runner"
> >     )
> >     opts = parser.parse_args()
> >
> >     options = PipelineOptions()
> >     pipeline_opts = {
> >         "runner": opts.runner,
> >         "flink_master": "localhost:8081",
> >         "job_name": "traffic-agg-sql",
> >         "environment_type": "LOOPBACK",
> >         "streaming": True,
> >         "parallelism": 3,
> >         "experiments": [
> >             "use_deprecated_read"
> >         ],  ## https://github.com/apache/beam/issues/20979
> >         "checkpointing_interval": "60000",
> >     }
> >     options = PipelineOptions([], **pipeline_opts)
> >     # Required, else it will complain that when importing worker
> functions
> >     options.view_as(SetupOptions).save_main_session = True
> >
> >     query = """
> >     WITH cte AS (
> >         SELECT id, CAST(event_datetime AS TIMESTAMP) AS ts
> >         FROM PCOLLECTION
> >     )
> >     SELECT
> >         CAST(TUMBLE_START(ts, INTERVAL '10' SECOND) AS VARCHAR) AS
> window_start,
> >         CAST(TUMBLE_END(ts, INTERVAL '10' SECOND) AS VARCHAR) AS
> window_end,
> >         COUNT(*) AS page_view
> >     FROM cte
> >     GROUP BY
> >         TUMBLE(ts, INTERVAL '10' SECOND), id
> >     """
> >
> >     p = beam.Pipeline(options=options)
> >     (
> >         p
> >         | "Read from Kafka"
> >         >> kafka.ReadFromKafka(
> >             consumer_config={
> >                 "bootstrap.servers": os.getenv(
> >                     "BOOTSTRAP_SERVERS",
> >                     "host.docker.internal:29092",
> >                 ),
> >                 "auto.offset.reset": "earliest",
> >                 # "enable.auto.commit": "true",
> >                 "group.id": "traffic-agg",
> >             },
> >             topics=["website-visit"],
> >         )
> >         | "Decode messages" >> beam.Map(decode_message)
> >         | "Parse elements" >>
> beam.Map(parse_json).with_output_types(EventLog)
> >         | "Format timestamp" >>
> beam.Map(format_timestamp).with_output_types(EventLog)
> >         | "Count per minute" >> SqlTransform(query)
> >         | beam.Map(print)
> >     )
> >
> >     logging.getLogger().setLevel(logging.INFO)
> >     logging.info("Building pipeline ...")
> >
> >     p.run().wait_until_finish()
> >
> > Here is the error message from the flink UI.
> >
> > 2024-03-07 12:01:41
> >
> org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.UncheckedExecutionException:
> java.lang.IllegalStateException: No container running for id
> cd132e177c5867a80becaeffd50075e87287db468e069c61e02056c7cdc90fc8
> >     at
> org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2086)
> >     at
> org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LocalCache.get(LocalCache.java:4012)
> >     at
> org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LocalCache.getOrLoad(LocalCache.java:4035)
> >     at
> org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:5013)
> >     at
> org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LocalCache$LocalLoadingCache.getUnchecked(LocalCache.java:5020)
> >     at
> org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$SimpleStageBundleFactory.<init>(DefaultJobBundleFactory.java:458)
> >     at
> org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$SimpleStageBundleFactory.<init>(DefaultJobBundleFactory.java:443)
> >     at
> org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory.forStage(DefaultJobBundleFactory.java:310)
> >     at
> org.apache.beam.runners.fnexecution.control.DefaultExecutableStageContext.getStageBundleFactory(DefaultExecutableStageContext.java:38)
> >     at
> org.apache.beam.runners.fnexecution.control.ReferenceCountingExecutableStageContextFactory$WrappedContext.getStageBundleFactory(ReferenceCountingExecutableStageContextFactory.java:207)
> >     at
> org.apache.beam.runners.flink.translation.wrappers.streaming.ExecutableStageDoFnOperator.open(ExecutableStageDoFnOperator.java:258)
> >     at
> org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:107)
> >     at
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:731)
> >     at
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
> >     at
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:706)
> >     at
> org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:672)
> >     at
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:935)
> >     at
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:904)
> >     at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:728)
> >     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550)
> >     at java.base/java.lang.Thread.run(Thread.java:829)
> > Caused by: java.lang.IllegalStateException: No container running for id
> cd132e177c5867a80becaeffd50075e87287db468e069c61e02056c7cdc90fc8
> >     at
> org.apache.beam.runners.fnexecution.environment.DockerEnvironmentFactory.createEnvironment(DockerEnvironmentFactory.java:137)
> >     at
> org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$1.load(DefaultJobBundleFactory.java:259)
> >     at
> org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$1.load(DefaultJobBundleFactory.java:232)
> >     at
> org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3571)
> >     at
> org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2313)
> >     at
> org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2190)
> >     at
> org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2080)
> >     ... 20 more
> >     Suppressed: java.io.IOException: Received exit code 1 for command
> 'docker kill
> cd132e177c5867a80becaeffd50075e87287db468e069c61e02056c7cdc90fc8'. stderr:
> Error response from daemon: Cannot kill container:
> cd132e177c5867a80becaeffd50075e87287db468e069c61e02056c7cdc90fc8: Container
> cd132e177c5867a80becaeffd50075e87287db468e069c61e02056c7cdc90fc8 is not
> running
> >         at
> org.apache.beam.runners.fnexecution.environment.DockerCommand.runShortCommand(DockerCommand.java:255)
> >         at
> org.apache.beam.runners.fnexecution.environment.DockerCommand.runShortCommand(DockerCommand.java:181)
> >         at
> org.apache.beam.runners.fnexecution.environment.DockerCommand.killContainer(DockerCommand.java:161)
> >         at
> org.apache.beam.runners.fnexecution.environment.DockerEnvironmentFactory.createEnvironment(DockerEnvironmentFactory.java:161)
> >
> >
>

Reply via email to