I am not using the python local runner but the flink runner. A flink cluster is started locally.
On Thu, 7 Mar 2024 at 12:13, Robert Bradshaw via user <[email protected]> wrote: > Streaming portable pipelines are not yet supported on the Python local > runner. > > On Wed, Mar 6, 2024 at 5:03 PM Jaehyeon Kim <[email protected]> wrote: > > > > Hello, > > > > I use the python SDK and my pipeline reads messages from Kafka and > transforms via SQL. I see two containers are created but it seems that they > don't communicate with each other so that the Flink task manager keeps > killing the containers. The Flink cluster runs locally. Is there a way to > run two multi-language pipelines (running on Docker) communicating with > each other? > > > > Cheers, > > Jaehyeon > > > > > > > > def run(): > > parser = argparse.ArgumentParser( > > description="Process statistics by user from website visit event" > > ) > > parser.add_argument( > > "--inputs", > > default="inputs", > > help="Specify folder name that event records are saved", > > ) > > parser.add_argument( > > "--runner", default="FlinkRunner", help="Specify Apache Beam > Runner" > > ) > > opts = parser.parse_args() > > > > options = PipelineOptions() > > pipeline_opts = { > > "runner": opts.runner, > > "flink_master": "localhost:8081", > > "job_name": "traffic-agg-sql", > > "environment_type": "LOOPBACK", > > "streaming": True, > > "parallelism": 3, > > "experiments": [ > > "use_deprecated_read" > > ], ## https://github.com/apache/beam/issues/20979 > > "checkpointing_interval": "60000", > > } > > options = PipelineOptions([], **pipeline_opts) > > # Required, else it will complain that when importing worker > functions > > options.view_as(SetupOptions).save_main_session = True > > > > query = """ > > WITH cte AS ( > > SELECT id, CAST(event_datetime AS TIMESTAMP) AS ts > > FROM PCOLLECTION > > ) > > SELECT > > CAST(TUMBLE_START(ts, INTERVAL '10' SECOND) AS VARCHAR) AS > window_start, > > CAST(TUMBLE_END(ts, INTERVAL '10' SECOND) AS VARCHAR) AS > window_end, > > COUNT(*) AS page_view > > FROM cte > > GROUP BY > > TUMBLE(ts, INTERVAL '10' SECOND), id > > """ > > > > p = beam.Pipeline(options=options) > > ( > > p > > | "Read from Kafka" > > >> kafka.ReadFromKafka( > > consumer_config={ > > "bootstrap.servers": os.getenv( > > "BOOTSTRAP_SERVERS", > > "host.docker.internal:29092", > > ), > > "auto.offset.reset": "earliest", > > # "enable.auto.commit": "true", > > "group.id": "traffic-agg", > > }, > > topics=["website-visit"], > > ) > > | "Decode messages" >> beam.Map(decode_message) > > | "Parse elements" >> > beam.Map(parse_json).with_output_types(EventLog) > > | "Format timestamp" >> > beam.Map(format_timestamp).with_output_types(EventLog) > > | "Count per minute" >> SqlTransform(query) > > | beam.Map(print) > > ) > > > > logging.getLogger().setLevel(logging.INFO) > > logging.info("Building pipeline ...") > > > > p.run().wait_until_finish() > > > > Here is the error message from the flink UI. > > > > 2024-03-07 12:01:41 > > > org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.UncheckedExecutionException: > java.lang.IllegalStateException: No container running for id > cd132e177c5867a80becaeffd50075e87287db468e069c61e02056c7cdc90fc8 > > at > org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2086) > > at > org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LocalCache.get(LocalCache.java:4012) > > at > org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LocalCache.getOrLoad(LocalCache.java:4035) > > at > org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:5013) > > at > org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LocalCache$LocalLoadingCache.getUnchecked(LocalCache.java:5020) > > at > org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$SimpleStageBundleFactory.<init>(DefaultJobBundleFactory.java:458) > > at > org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$SimpleStageBundleFactory.<init>(DefaultJobBundleFactory.java:443) > > at > org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory.forStage(DefaultJobBundleFactory.java:310) > > at > org.apache.beam.runners.fnexecution.control.DefaultExecutableStageContext.getStageBundleFactory(DefaultExecutableStageContext.java:38) > > at > org.apache.beam.runners.fnexecution.control.ReferenceCountingExecutableStageContextFactory$WrappedContext.getStageBundleFactory(ReferenceCountingExecutableStageContextFactory.java:207) > > at > org.apache.beam.runners.flink.translation.wrappers.streaming.ExecutableStageDoFnOperator.open(ExecutableStageDoFnOperator.java:258) > > at > org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:107) > > at > org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:731) > > at > org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55) > > at > org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:706) > > at > org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:672) > > at > org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:935) > > at > org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:904) > > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:728) > > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550) > > at java.base/java.lang.Thread.run(Thread.java:829) > > Caused by: java.lang.IllegalStateException: No container running for id > cd132e177c5867a80becaeffd50075e87287db468e069c61e02056c7cdc90fc8 > > at > org.apache.beam.runners.fnexecution.environment.DockerEnvironmentFactory.createEnvironment(DockerEnvironmentFactory.java:137) > > at > org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$1.load(DefaultJobBundleFactory.java:259) > > at > org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$1.load(DefaultJobBundleFactory.java:232) > > at > org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3571) > > at > org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2313) > > at > org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2190) > > at > org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2080) > > ... 20 more > > Suppressed: java.io.IOException: Received exit code 1 for command > 'docker kill > cd132e177c5867a80becaeffd50075e87287db468e069c61e02056c7cdc90fc8'. stderr: > Error response from daemon: Cannot kill container: > cd132e177c5867a80becaeffd50075e87287db468e069c61e02056c7cdc90fc8: Container > cd132e177c5867a80becaeffd50075e87287db468e069c61e02056c7cdc90fc8 is not > running > > at > org.apache.beam.runners.fnexecution.environment.DockerCommand.runShortCommand(DockerCommand.java:255) > > at > org.apache.beam.runners.fnexecution.environment.DockerCommand.runShortCommand(DockerCommand.java:181) > > at > org.apache.beam.runners.fnexecution.environment.DockerCommand.killContainer(DockerCommand.java:161) > > at > org.apache.beam.runners.fnexecution.environment.DockerEnvironmentFactory.createEnvironment(DockerEnvironmentFactory.java:161) > > > > >
