Hello,

I use the python SDK and my pipeline reads messages from Kafka and
transforms via SQL. I see two containers are created but it seems that they
don't communicate with each other so that the Flink task manager keeps
killing the containers. The Flink cluster runs locally. Is there a way to
run two multi-language pipelines (running on Docker) communicating with
each other?

Cheers,
Jaehyeon

[image: image.png]

def run():
    parser = argparse.ArgumentParser(
        description="Process statistics by user from website visit event"
    )
    parser.add_argument(
        "--inputs",
        default="inputs",
        help="Specify folder name that event records are saved",
    )
    parser.add_argument(
        "--runner", default="FlinkRunner", help="Specify Apache Beam Runner"
    )
    opts = parser.parse_args()

    options = PipelineOptions()
    pipeline_opts = {
        "runner": opts.runner,
        "flink_master": "localhost:8081",
        "job_name": "traffic-agg-sql",
        "environment_type": "LOOPBACK",
        "streaming": True,
        "parallelism": 3,
        "experiments": [
            "use_deprecated_read"
        ],  ## https://github.com/apache/beam/issues/20979
        "checkpointing_interval": "60000",
    }
    options = PipelineOptions([], **pipeline_opts)
    # Required, else it will complain that when importing worker functions
    options.view_as(SetupOptions).save_main_session = True

    query = """
    WITH cte AS (
        SELECT id, CAST(event_datetime AS TIMESTAMP) AS ts
        FROM PCOLLECTION
    )
    SELECT
        CAST(TUMBLE_START(ts, INTERVAL '10' SECOND) AS VARCHAR) AS
window_start,
        CAST(TUMBLE_END(ts, INTERVAL '10' SECOND) AS VARCHAR) AS window_end,
        COUNT(*) AS page_view
    FROM cte
    GROUP BY
        TUMBLE(ts, INTERVAL '10' SECOND), id
    """

    p = beam.Pipeline(options=options)
    (
        p
        | "Read from Kafka"
        >> kafka.ReadFromKafka(
            consumer_config={
                "bootstrap.servers": os.getenv(
                    "BOOTSTRAP_SERVERS",
                    "host.docker.internal:29092",
                ),
                "auto.offset.reset": "earliest",
                # "enable.auto.commit": "true",
                "group.id": "traffic-agg",
            },
            topics=["website-visit"],
        )
        | "Decode messages" >> beam.Map(decode_message)
        | "Parse elements" >> beam.Map(parse_json).with_output_types(
EventLog)
        | "Format timestamp" >> beam.Map(format_timestamp).with_output_types
(EventLog)
        | "Count per minute" >> SqlTransform(query)
        | beam.Map(print)
    )

    logging.getLogger().setLevel(logging.INFO)
    logging.info("Building pipeline ...")

    p.run().wait_until_finish()

Here is the error message from the flink UI.

2024-03-07 12:01:41
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.
UncheckedExecutionException: java.lang.IllegalStateException: No container
running for id
cd132e177c5867a80becaeffd50075e87287db468e069c61e02056c7cdc90fc8
    at org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.
LocalCache$Segment.get(LocalCache.java:2086)
    at org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.
LocalCache.get(LocalCache.java:4012)
    at org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.
LocalCache.getOrLoad(LocalCache.java:4035)
    at org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.
LocalCache$LocalLoadingCache.get(LocalCache.java:5013)
    at org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.
LocalCache$LocalLoadingCache.getUnchecked(LocalCache.java:5020)
    at org.apache.beam.runners.fnexecution.control.
DefaultJobBundleFactory$SimpleStageBundleFactory.<init>(
DefaultJobBundleFactory.java:458)
    at org.apache.beam.runners.fnexecution.control.
DefaultJobBundleFactory$SimpleStageBundleFactory.<init>(
DefaultJobBundleFactory.java:443)
    at org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory
.forStage(DefaultJobBundleFactory.java:310)
    at org.apache.beam.runners.fnexecution.control.
DefaultExecutableStageContext.getStageBundleFactory(
DefaultExecutableStageContext.java:38)
    at org.apache.beam.runners.fnexecution.control.
ReferenceCountingExecutableStageContextFactory$WrappedContext
.getStageBundleFactory(ReferenceCountingExecutableStageContextFactory.java:
207)
    at org.apache.beam.runners.flink.translation.wrappers.streaming.
ExecutableStageDoFnOperator.open(ExecutableStageDoFnOperator.java:258)
    at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain
.initializeStateAndOpenOperators(RegularOperatorChain.java:107)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(
StreamTask.java:731)
    at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1
.call(StreamTaskActionExecutor.java:55)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(
StreamTask.java:706)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(
StreamTask.java:672)
    at org.apache.flink.runtime.taskmanager.Task
.runWithSystemExitMonitoring(Task.java:935)
    at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:
904)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:728)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550)
    at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.lang.IllegalStateException: No container running for id
cd132e177c5867a80becaeffd50075e87287db468e069c61e02056c7cdc90fc8
    at org.apache.beam.runners.fnexecution.environment.
DockerEnvironmentFactory.createEnvironment(DockerEnvironmentFactory.java:137
)
    at org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$1
.load(DefaultJobBundleFactory.java:259)
    at org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$1
.load(DefaultJobBundleFactory.java:232)
    at org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.
LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3571)
    at org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.
LocalCache$Segment.loadSync(LocalCache.java:2313)
    at org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.
LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2190)
    at org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.
LocalCache$Segment.get(LocalCache.java:2080)
    ... 20 more
    Suppressed: java.io.IOException: Received exit code 1 for command 'docker
kill cd132e177c5867a80becaeffd50075e87287db468e069c61e02056c7cdc90fc8'.
stderr: Error response from daemon: Cannot kill container:
cd132e177c5867a80becaeffd50075e87287db468e069c61e02056c7cdc90fc8: Container
cd132e177c5867a80becaeffd50075e87287db468e069c61e02056c7cdc90fc8 is not
running
        at org.apache.beam.runners.fnexecution.environment.DockerCommand
.runShortCommand(DockerCommand.java:255)
        at org.apache.beam.runners.fnexecution.environment.DockerCommand
.runShortCommand(DockerCommand.java:181)
        at org.apache.beam.runners.fnexecution.environment.DockerCommand
.killContainer(DockerCommand.java:161)
        at org.apache.beam.runners.fnexecution.environment.
DockerEnvironmentFactory.createEnvironment(DockerEnvironmentFactory.java:161
)

Reply via email to