It may be a configuration issue. It works if I don't specify *flink_master*, which uses an embedded flink cluster.
On Thu, 7 Mar 2024 at 12:47, Jaehyeon Kim <dott...@gmail.com> wrote: > I works fine if I only use Kafka read/write as I only see a single > container - two transforms (read and write) but a single container. > > If I add SqlTransform, however, another container is created and it begins > to create an error. My speculation is the containers don't recognise each > other and get killed by the Flink task manager. I see containers are kept > created and killed. > > Does every multi-language pipeline runs in a separate container? > > On Thu, 7 Mar 2024, 12:35 pm Robert Bradshaw via user, < > user@beam.apache.org> wrote: > >> Oh, sorry, I didn't see that. >> >> I would look earlier in the logs and see why it failed to bring up the >> containers (or, if they started, look in the container logs to see why >> they died). >> >> On Wed, Mar 6, 2024 at 5:28 PM Jaehyeon Kim <dott...@gmail.com> wrote: >> > >> > I am not using the python local runner but the flink runner. A flink >> cluster is started locally. >> > >> > On Thu, 7 Mar 2024 at 12:13, Robert Bradshaw via user < >> user@beam.apache.org> wrote: >> >> >> >> Streaming portable pipelines are not yet supported on the Python local >> runner. >> >> >> >> On Wed, Mar 6, 2024 at 5:03 PM Jaehyeon Kim <dott...@gmail.com> wrote: >> >> > >> >> > Hello, >> >> > >> >> > I use the python SDK and my pipeline reads messages from Kafka and >> transforms via SQL. I see two containers are created but it seems that they >> don't communicate with each other so that the Flink task manager keeps >> killing the containers. The Flink cluster runs locally. Is there a way to >> run two multi-language pipelines (running on Docker) communicating with >> each other? >> >> > >> >> > Cheers, >> >> > Jaehyeon >> >> > >> >> > >> >> > >> >> > def run(): >> >> > parser = argparse.ArgumentParser( >> >> > description="Process statistics by user from website visit >> event" >> >> > ) >> >> > parser.add_argument( >> >> > "--inputs", >> >> > default="inputs", >> >> > help="Specify folder name that event records are saved", >> >> > ) >> >> > parser.add_argument( >> >> > "--runner", default="FlinkRunner", help="Specify Apache Beam >> Runner" >> >> > ) >> >> > opts = parser.parse_args() >> >> > >> >> > options = PipelineOptions() >> >> > pipeline_opts = { >> >> > "runner": opts.runner, >> >> > "flink_master": "localhost:8081", >> >> > "job_name": "traffic-agg-sql", >> >> > "environment_type": "LOOPBACK", >> >> > "streaming": True, >> >> > "parallelism": 3, >> >> > "experiments": [ >> >> > "use_deprecated_read" >> >> > ], ## https://github.com/apache/beam/issues/20979 >> >> > "checkpointing_interval": "60000", >> >> > } >> >> > options = PipelineOptions([], **pipeline_opts) >> >> > # Required, else it will complain that when importing worker >> functions >> >> > options.view_as(SetupOptions).save_main_session = True >> >> > >> >> > query = """ >> >> > WITH cte AS ( >> >> > SELECT id, CAST(event_datetime AS TIMESTAMP) AS ts >> >> > FROM PCOLLECTION >> >> > ) >> >> > SELECT >> >> > CAST(TUMBLE_START(ts, INTERVAL '10' SECOND) AS VARCHAR) AS >> window_start, >> >> > CAST(TUMBLE_END(ts, INTERVAL '10' SECOND) AS VARCHAR) AS >> window_end, >> >> > COUNT(*) AS page_view >> >> > FROM cte >> >> > GROUP BY >> >> > TUMBLE(ts, INTERVAL '10' SECOND), id >> >> > """ >> >> > >> >> > p = beam.Pipeline(options=options) >> >> > ( >> >> > p >> >> > | "Read from Kafka" >> >> > >> kafka.ReadFromKafka( >> >> > consumer_config={ >> >> > "bootstrap.servers": os.getenv( >> >> > "BOOTSTRAP_SERVERS", >> >> > "host.docker.internal:29092", >> >> > ), >> >> > "auto.offset.reset": "earliest", >> >> > # "enable.auto.commit": "true", >> >> > "group.id": "traffic-agg", >> >> > }, >> >> > topics=["website-visit"], >> >> > ) >> >> > | "Decode messages" >> beam.Map(decode_message) >> >> > | "Parse elements" >> >> beam.Map(parse_json).with_output_types(EventLog) >> >> > | "Format timestamp" >> >> beam.Map(format_timestamp).with_output_types(EventLog) >> >> > | "Count per minute" >> SqlTransform(query) >> >> > | beam.Map(print) >> >> > ) >> >> > >> >> > logging.getLogger().setLevel(logging.INFO) >> >> > logging.info("Building pipeline ...") >> >> > >> >> > p.run().wait_until_finish() >> >> > >> >> > Here is the error message from the flink UI. >> >> > >> >> > 2024-03-07 12:01:41 >> >> > >> org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.UncheckedExecutionException: >> java.lang.IllegalStateException: No container running for id >> cd132e177c5867a80becaeffd50075e87287db468e069c61e02056c7cdc90fc8 >> >> > at >> org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2086) >> >> > at >> org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LocalCache.get(LocalCache.java:4012) >> >> > at >> org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LocalCache.getOrLoad(LocalCache.java:4035) >> >> > at >> org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:5013) >> >> > at >> org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LocalCache$LocalLoadingCache.getUnchecked(LocalCache.java:5020) >> >> > at >> org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$SimpleStageBundleFactory.<init>(DefaultJobBundleFactory.java:458) >> >> > at >> org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$SimpleStageBundleFactory.<init>(DefaultJobBundleFactory.java:443) >> >> > at >> org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory.forStage(DefaultJobBundleFactory.java:310) >> >> > at >> org.apache.beam.runners.fnexecution.control.DefaultExecutableStageContext.getStageBundleFactory(DefaultExecutableStageContext.java:38) >> >> > at >> org.apache.beam.runners.fnexecution.control.ReferenceCountingExecutableStageContextFactory$WrappedContext.getStageBundleFactory(ReferenceCountingExecutableStageContextFactory.java:207) >> >> > at >> org.apache.beam.runners.flink.translation.wrappers.streaming.ExecutableStageDoFnOperator.open(ExecutableStageDoFnOperator.java:258) >> >> > at >> org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:107) >> >> > at >> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:731) >> >> > at >> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55) >> >> > at >> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:706) >> >> > at >> org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:672) >> >> > at >> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:935) >> >> > at >> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:904) >> >> > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:728) >> >> > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550) >> >> > at java.base/java.lang.Thread.run(Thread.java:829) >> >> > Caused by: java.lang.IllegalStateException: No container running for >> id cd132e177c5867a80becaeffd50075e87287db468e069c61e02056c7cdc90fc8 >> >> > at >> org.apache.beam.runners.fnexecution.environment.DockerEnvironmentFactory.createEnvironment(DockerEnvironmentFactory.java:137) >> >> > at >> org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$1.load(DefaultJobBundleFactory.java:259) >> >> > at >> org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$1.load(DefaultJobBundleFactory.java:232) >> >> > at >> org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3571) >> >> > at >> org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2313) >> >> > at >> org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2190) >> >> > at >> org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2080) >> >> > ... 20 more >> >> > Suppressed: java.io.IOException: Received exit code 1 for >> command 'docker kill >> cd132e177c5867a80becaeffd50075e87287db468e069c61e02056c7cdc90fc8'. stderr: >> Error response from daemon: Cannot kill container: >> cd132e177c5867a80becaeffd50075e87287db468e069c61e02056c7cdc90fc8: Container >> cd132e177c5867a80becaeffd50075e87287db468e069c61e02056c7cdc90fc8 is not >> running >> >> > at >> org.apache.beam.runners.fnexecution.environment.DockerCommand.runShortCommand(DockerCommand.java:255) >> >> > at >> org.apache.beam.runners.fnexecution.environment.DockerCommand.runShortCommand(DockerCommand.java:181) >> >> > at >> org.apache.beam.runners.fnexecution.environment.DockerCommand.killContainer(DockerCommand.java:161) >> >> > at >> org.apache.beam.runners.fnexecution.environment.DockerEnvironmentFactory.createEnvironment(DockerEnvironmentFactory.java:161) >> >> > >> >> > >> >