It may be a configuration issue. It works if I don't specify *flink_master*,
which uses an embedded flink cluster.

On Thu, 7 Mar 2024 at 12:47, Jaehyeon Kim <dott...@gmail.com> wrote:

> I works fine if I only use Kafka read/write as I only see a single
> container - two transforms (read and write) but a single container.
>
> If I add SqlTransform, however, another container is created and it begins
> to create an error. My speculation is the containers don't recognise each
> other and get killed by the Flink task manager. I see containers are kept
> created and killed.
>
> Does every multi-language pipeline runs in a separate container?
>
> On Thu, 7 Mar 2024, 12:35 pm Robert Bradshaw via user, <
> user@beam.apache.org> wrote:
>
>> Oh, sorry, I didn't see that.
>>
>> I would look earlier in the logs and see why it failed to bring up the
>> containers (or, if they started, look in the container logs to see why
>> they died).
>>
>> On Wed, Mar 6, 2024 at 5:28 PM Jaehyeon Kim <dott...@gmail.com> wrote:
>> >
>> > I am not using the python local runner but the flink runner. A flink
>> cluster is started locally.
>> >
>> > On Thu, 7 Mar 2024 at 12:13, Robert Bradshaw via user <
>> user@beam.apache.org> wrote:
>> >>
>> >> Streaming portable pipelines are not yet supported on the Python local
>> runner.
>> >>
>> >> On Wed, Mar 6, 2024 at 5:03 PM Jaehyeon Kim <dott...@gmail.com> wrote:
>> >> >
>> >> > Hello,
>> >> >
>> >> > I use the python SDK and my pipeline reads messages from Kafka and
>> transforms via SQL. I see two containers are created but it seems that they
>> don't communicate with each other so that the Flink task manager keeps
>> killing the containers. The Flink cluster runs locally. Is there a way to
>> run two multi-language pipelines (running on Docker) communicating with
>> each other?
>> >> >
>> >> > Cheers,
>> >> > Jaehyeon
>> >> >
>> >> >
>> >> >
>> >> > def run():
>> >> >     parser = argparse.ArgumentParser(
>> >> >         description="Process statistics by user from website visit
>> event"
>> >> >     )
>> >> >     parser.add_argument(
>> >> >         "--inputs",
>> >> >         default="inputs",
>> >> >         help="Specify folder name that event records are saved",
>> >> >     )
>> >> >     parser.add_argument(
>> >> >         "--runner", default="FlinkRunner", help="Specify Apache Beam
>> Runner"
>> >> >     )
>> >> >     opts = parser.parse_args()
>> >> >
>> >> >     options = PipelineOptions()
>> >> >     pipeline_opts = {
>> >> >         "runner": opts.runner,
>> >> >         "flink_master": "localhost:8081",
>> >> >         "job_name": "traffic-agg-sql",
>> >> >         "environment_type": "LOOPBACK",
>> >> >         "streaming": True,
>> >> >         "parallelism": 3,
>> >> >         "experiments": [
>> >> >             "use_deprecated_read"
>> >> >         ],  ## https://github.com/apache/beam/issues/20979
>> >> >         "checkpointing_interval": "60000",
>> >> >     }
>> >> >     options = PipelineOptions([], **pipeline_opts)
>> >> >     # Required, else it will complain that when importing worker
>> functions
>> >> >     options.view_as(SetupOptions).save_main_session = True
>> >> >
>> >> >     query = """
>> >> >     WITH cte AS (
>> >> >         SELECT id, CAST(event_datetime AS TIMESTAMP) AS ts
>> >> >         FROM PCOLLECTION
>> >> >     )
>> >> >     SELECT
>> >> >         CAST(TUMBLE_START(ts, INTERVAL '10' SECOND) AS VARCHAR) AS
>> window_start,
>> >> >         CAST(TUMBLE_END(ts, INTERVAL '10' SECOND) AS VARCHAR) AS
>> window_end,
>> >> >         COUNT(*) AS page_view
>> >> >     FROM cte
>> >> >     GROUP BY
>> >> >         TUMBLE(ts, INTERVAL '10' SECOND), id
>> >> >     """
>> >> >
>> >> >     p = beam.Pipeline(options=options)
>> >> >     (
>> >> >         p
>> >> >         | "Read from Kafka"
>> >> >         >> kafka.ReadFromKafka(
>> >> >             consumer_config={
>> >> >                 "bootstrap.servers": os.getenv(
>> >> >                     "BOOTSTRAP_SERVERS",
>> >> >                     "host.docker.internal:29092",
>> >> >                 ),
>> >> >                 "auto.offset.reset": "earliest",
>> >> >                 # "enable.auto.commit": "true",
>> >> >                 "group.id": "traffic-agg",
>> >> >             },
>> >> >             topics=["website-visit"],
>> >> >         )
>> >> >         | "Decode messages" >> beam.Map(decode_message)
>> >> >         | "Parse elements" >>
>> beam.Map(parse_json).with_output_types(EventLog)
>> >> >         | "Format timestamp" >>
>> beam.Map(format_timestamp).with_output_types(EventLog)
>> >> >         | "Count per minute" >> SqlTransform(query)
>> >> >         | beam.Map(print)
>> >> >     )
>> >> >
>> >> >     logging.getLogger().setLevel(logging.INFO)
>> >> >     logging.info("Building pipeline ...")
>> >> >
>> >> >     p.run().wait_until_finish()
>> >> >
>> >> > Here is the error message from the flink UI.
>> >> >
>> >> > 2024-03-07 12:01:41
>> >> >
>> org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.UncheckedExecutionException:
>> java.lang.IllegalStateException: No container running for id
>> cd132e177c5867a80becaeffd50075e87287db468e069c61e02056c7cdc90fc8
>> >> >     at
>> org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2086)
>> >> >     at
>> org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LocalCache.get(LocalCache.java:4012)
>> >> >     at
>> org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LocalCache.getOrLoad(LocalCache.java:4035)
>> >> >     at
>> org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:5013)
>> >> >     at
>> org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LocalCache$LocalLoadingCache.getUnchecked(LocalCache.java:5020)
>> >> >     at
>> org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$SimpleStageBundleFactory.<init>(DefaultJobBundleFactory.java:458)
>> >> >     at
>> org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$SimpleStageBundleFactory.<init>(DefaultJobBundleFactory.java:443)
>> >> >     at
>> org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory.forStage(DefaultJobBundleFactory.java:310)
>> >> >     at
>> org.apache.beam.runners.fnexecution.control.DefaultExecutableStageContext.getStageBundleFactory(DefaultExecutableStageContext.java:38)
>> >> >     at
>> org.apache.beam.runners.fnexecution.control.ReferenceCountingExecutableStageContextFactory$WrappedContext.getStageBundleFactory(ReferenceCountingExecutableStageContextFactory.java:207)
>> >> >     at
>> org.apache.beam.runners.flink.translation.wrappers.streaming.ExecutableStageDoFnOperator.open(ExecutableStageDoFnOperator.java:258)
>> >> >     at
>> org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:107)
>> >> >     at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:731)
>> >> >     at
>> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
>> >> >     at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:706)
>> >> >     at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:672)
>> >> >     at
>> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:935)
>> >> >     at
>> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:904)
>> >> >     at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:728)
>> >> >     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550)
>> >> >     at java.base/java.lang.Thread.run(Thread.java:829)
>> >> > Caused by: java.lang.IllegalStateException: No container running for
>> id cd132e177c5867a80becaeffd50075e87287db468e069c61e02056c7cdc90fc8
>> >> >     at
>> org.apache.beam.runners.fnexecution.environment.DockerEnvironmentFactory.createEnvironment(DockerEnvironmentFactory.java:137)
>> >> >     at
>> org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$1.load(DefaultJobBundleFactory.java:259)
>> >> >     at
>> org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$1.load(DefaultJobBundleFactory.java:232)
>> >> >     at
>> org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3571)
>> >> >     at
>> org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2313)
>> >> >     at
>> org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2190)
>> >> >     at
>> org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2080)
>> >> >     ... 20 more
>> >> >     Suppressed: java.io.IOException: Received exit code 1 for
>> command 'docker kill
>> cd132e177c5867a80becaeffd50075e87287db468e069c61e02056c7cdc90fc8'. stderr:
>> Error response from daemon: Cannot kill container:
>> cd132e177c5867a80becaeffd50075e87287db468e069c61e02056c7cdc90fc8: Container
>> cd132e177c5867a80becaeffd50075e87287db468e069c61e02056c7cdc90fc8 is not
>> running
>> >> >         at
>> org.apache.beam.runners.fnexecution.environment.DockerCommand.runShortCommand(DockerCommand.java:255)
>> >> >         at
>> org.apache.beam.runners.fnexecution.environment.DockerCommand.runShortCommand(DockerCommand.java:181)
>> >> >         at
>> org.apache.beam.runners.fnexecution.environment.DockerCommand.killContainer(DockerCommand.java:161)
>> >> >         at
>> org.apache.beam.runners.fnexecution.environment.DockerEnvironmentFactory.createEnvironment(DockerEnvironmentFactory.java:161)
>> >> >
>> >> >
>>
>

Reply via email to