Hello, Attached includes more logging details.
WARNING:root:Waiting for grpc channel to be ready at localhost:45371. INFO:root:Building pipeline ... INFO:apache_beam.runners.worker.worker_pool_main:Listening for workers at localhost:36959 INFO:apache_beam.runners.portability.fn_api_runner.translations:==================== <function pack_combiners at 0x7fd0ac33d630> ==================== INFO:apache_beam.runners.portability.fn_api_runner.translations:==================== <function lift_combiners at 0x7fd0ac33d6c0> ==================== INFO:apache_beam.runners.portability.fn_api_runner.translations:==================== <function sort_stages at 0x7fd0ac33de10> ==================== INFO:apache_beam.runners.portability.flink_runner:Adding HTTP protocol scheme to flink_master parameter: http://localhost:8081 INFO:apache_beam.utils.subprocess_server:Using cached job server jar from https://repo.maven.apache.org/maven2/org/apache/beam/beam-runners-flink-1.16-job-server/2.53.0/beam-runners-flink-1.16-job-server-2.53.0.jar INFO:apache_beam.utils.subprocess_server:Starting service with ('java' '-jar' '/home/jaehyeon/.apache_beam/cache/jars/beam-runners-flink-1.16-job-server-2.53.0.jar' '--flink-master' 'http://localhost:8081' '--artifacts-dir' '/tmp/beam-temp80sx9jtu/artifactsv2s2mbvo' '--job-port' '50353' '--artifact-port' '0' '--expansion-port' '0') INFO:apache_beam.utils.subprocess_server:Mar 13, 2024 6:49:33 AM org.apache.beam.runners.jobsubmission.JobServerDriver createArtifactStagingService INFO:apache_beam.utils.subprocess_server:INFO: ArtifactStagingService started on localhost:38121 INFO:apache_beam.utils.subprocess_server:Mar 13, 2024 6:49:33 AM org.apache.beam.runners.jobsubmission.JobServerDriver createExpansionService INFO:apache_beam.utils.subprocess_server:INFO: Java ExpansionService started on localhost:36431 INFO:apache_beam.utils.subprocess_server:Mar 13, 2024 6:49:33 AM org.apache.beam.runners.jobsubmission.JobServerDriver createJobServer INFO:apache_beam.utils.subprocess_server:INFO: JobService started on localhost:50353 INFO:apache_beam.utils.subprocess_server:Mar 13, 2024 6:49:33 AM org.apache.beam.runners.jobsubmission.JobServerDriver run INFO:apache_beam.utils.subprocess_server:INFO: Job server now running, terminate with Ctrl+C WARNING:root:Waiting for grpc channel to be ready at localhost:50353. INFO:apache_beam.utils.subprocess_server:Mar 13, 2024 6:49:36 AM org.apache.beam.runners.fnexecution.artifact.ArtifactStagingService$2 onNext INFO:apache_beam.utils.subprocess_server:INFO: Staging artifacts for job_ddf6a897-788d-49ca-8d14-51029ce17f58. INFO:apache_beam.utils.subprocess_server:Mar 13, 2024 6:49:36 AM org.apache.beam.runners.fnexecution.artifact.ArtifactStagingService$2 resolveNextEnvironment INFO:apache_beam.utils.subprocess_server:INFO: Resolving artifacts for job_ddf6a897-788d-49ca-8d14-51029ce17f58.0 :ref_Environment_default_environment_2. INFO:apache_beam.utils.subprocess_server:Mar 13, 2024 6:49:36 AM org.apache.beam.runners.fnexecution.artifact.ArtifactStagingService$2 onNext INFO:apache_beam.utils.subprocess_server:INFO: Getting 1 artifacts for job_ddf6a897-788d-49ca-8d14-51029ce17f58.0:external_1beam:env:docker:v1. INFO:apache_beam.utils.subprocess_server:Mar 13, 2024 6:49:36 AM org.apache.beam.runners.fnexecution.artifact.ArtifactStagingService$2 resolveNextEnvironment INFO:apache_beam.utils.subprocess_server:INFO: Resolving artifacts for job_ddf6a897-788d-49ca-8d14-51029ce17f58.0:external_1beam:env:docker:v1. INFO:apache_beam.utils.subprocess_server:Mar 13, 2024 6:49:36 AM org.apache.beam.runners.fnexecution.artifact.ArtifactStagingService$2 onNext INFO:apache_beam.utils.subprocess_server:INFO: Getting 1 artifacts for job_ddf6a897-788d-49ca-8d14-51029ce17f58.null. INFO:apache_beam.utils.subprocess_server:Mar 13, 2024 6:49:36 AM org.apache.beam.runners.fnexecution.artifact.ArtifactStagingService$2 finishStaging INFO:apache_beam.utils.subprocess_server:INFO: Artifacts fully staged for job_ddf6a897-788d-49ca-8d14-51029ce17f58. INFO:apache_beam.utils.subprocess_server:Mar 13, 2024 6:49:37 AM org.apache.beam.runners.flink.FlinkJobInvoker invokeWithExecutor INFO:apache_beam.utils.subprocess_server:INFO: Invoking job sql-transform_4f61dc80-9aeb-4e85-9bfb-3df1b6e3db48 with pipeline runner org.apache.beam.runners.flink.FlinkPipelineRunner@2f4dfe06 INFO:apache_beam.utils.subprocess_server:Mar 13, 2024 6:49:37 AM org.apache.beam.runners.jobsubmission.JobInvocation start INFO:apache_beam.utils.subprocess_server:INFO: Starting job invocation sql-transform_4f61dc80-9aeb-4e85-9bfb-3df1b6e3db48 INFO:apache_beam.runners.portability.portable_runner:Environment "LOOPBACK" has started a component necessary for the execution. Be sure to run the pipeline using with Pipeline() as p: p.apply(..) This ensures that the pipeline finishes before this program exits. INFO:apache_beam.utils.subprocess_server:Mar 13, 2024 6:49:37 AM org.apache.beam.runners.flink.FlinkPipelineRunner runPipelineWithTranslator INFO:apache_beam.utils.subprocess_server:INFO: Translating pipeline to Flink program. INFO:apache_beam.runners.portability.portable_runner:Job state changed to STOPPED INFO:apache_beam.runners.portability.portable_runner:Job state changed to STARTING INFO:apache_beam.runners.portability.portable_runner:Job state changed to RUNNING INFO:apache_beam.utils.subprocess_server:Mar 13, 2024 6:49:38 AM org.apache.beam.runners.flink.FlinkExecutionEnvironments createBatchExecutionEnvironment INFO:apache_beam.utils.subprocess_server:INFO: Creating a Batch Execution Environment. INFO:apache_beam.utils.subprocess_server:Mar 13, 2024 6:49:38 AM org.apache.beam.runners.flink.FlinkExecutionEnvironments createBatchExecutionEnvironment INFO:apache_beam.utils.subprocess_server:INFO: Using Flink Master URL localhost:8081. INFO:apache_beam.utils.subprocess_server:Mar 13, 2024 6:49:38 AM org.apache.flink.api.java.utils.PlanGenerator logTypeRegistrationDetails INFO:apache_beam.utils.subprocess_server:INFO: The job has 0 registered types and 0 default Kryo serializers INFO:apache_beam.utils.subprocess_server:Mar 13, 2024 6:49:39 AM org.apache.flink.client.program.rest.RestClusterClient lambda$submitJob$7 INFO:apache_beam.utils.subprocess_server:INFO: Submitting job 'sql-transform' (772417f6ae0736da9f7156d334c4b8e7). INFO:apache_beam.utils.subprocess_server:Mar 13, 2024 6:49:44 AM org.apache.flink.client.program.rest.RestClusterClient lambda$null$6 INFO:apache_beam.utils.subprocess_server:INFO: Successfully submitted job 'sql-transform' (772417f6ae0736da9f7156d334c4b8e7) to 'http://localhost:8081 '. INFO:apache_beam.runners.worker.statecache:Creating state cache with size 104857600 INFO:apache_beam.runners.worker.sdk_worker:Creating insecure control channel for localhost:34207. INFO:apache_beam.runners.worker.sdk_worker:Control channel established. INFO:apache_beam.runners.worker.sdk_worker:Initializing SDKHarness with unbounded number of workers. INFO:apache_beam.runners.worker.sdk_worker:Creating insecure state channel for localhost:37411. INFO:apache_beam.runners.worker.sdk_worker:State channel established. INFO:apache_beam.runners.worker.data_plane:Creating client data channel for localhost:34955 INFO:apache_beam.utils.subprocess_server:Mar 13, 2024 6:50:09 AM org.apache.beam.runners.jobsubmission.JobInvocation$1 onFailure INFO:apache_beam.utils.subprocess_server:SEVERE: Error during job invocation sql-transform_4f61dc80-9aeb-4e85-9bfb-3df1b6e3db48. INFO:apache_beam.utils.subprocess_server: org.apache.flink.client.program.ProgramInvocationException: Job failed (JobID: 772417f6ae0736da9f7156d334c4b8e7) INFO:apache_beam.utils.subprocess_server: at org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null $6(ClusterClientJobClientAdapter.java:130) INFO:apache_beam.utils.subprocess_server: at java.base/java. util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java: 642) INFO:apache_beam.utils.subprocess_server: at java.base/java. util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) INFO:apache_beam.utils.subprocess_server: at java.base/java. util.concurrent.CompletableFuture.complete(CompletableFuture.java:2079) INFO:apache_beam.utils.subprocess_server: at org.apache.flink.util.concurrent.FutureUtils.lambda$retryOperationWithDelay$ 6(FutureUtils.java:301) INFO:apache_beam.utils.subprocess_server: at java.base/java. util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859 ) INFO:apache_beam.utils.subprocess_server: at java.base/java. util.concurrent.CompletableFuture$UniWhenComplete.tryFire( CompletableFuture.java:837) INFO:apache_beam.utils.subprocess_server: at java.base/java. util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) INFO:apache_beam.utils.subprocess_server: at java.base/java. util.concurrent.CompletableFuture.complete(CompletableFuture.java:2079) INFO:apache_beam.utils.subprocess_server: at org.apache.flink.client.program.rest.RestClusterClient.lambda $pollResourceAsync$31(RestClusterClient.java:772) INFO:apache_beam.utils.subprocess_server: at java.base/java. util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859 ) INFO:apache_beam.utils.subprocess_server: at java.base/java. util.concurrent.CompletableFuture$UniWhenComplete.tryFire( CompletableFuture.java:837) INFO:apache_beam.utils.subprocess_server: at java.base/java. util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) INFO:apache_beam.utils.subprocess_server: at java.base/java. util.concurrent.CompletableFuture.complete(CompletableFuture.java:2079) INFO:apache_beam.utils.subprocess_server: at org.apache.flink.util.concurrent.FutureUtils.lambda$retryOperationWithDelay$ 6(FutureUtils.java:301) INFO:apache_beam.utils.subprocess_server: at java.base/java. util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859 ) INFO:apache_beam.utils.subprocess_server: at java.base/java. util.concurrent.CompletableFuture$UniWhenComplete.tryFire( CompletableFuture.java:837) INFO:apache_beam.utils.subprocess_server: at java.base/java. util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) INFO:apache_beam.utils.subprocess_server: at java.base/java. util.concurrent.CompletableFuture.postFire(CompletableFuture.java:610) INFO:apache_beam.utils.subprocess_server: at java.base/java. util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java: 1085) INFO:apache_beam.utils.subprocess_server: at java.base/java. util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:478) INFO:apache_beam.utils.subprocess_server: at java.base/java. util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) INFO:apache_beam.utils.subprocess_server: at java.base/java. util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) INFO:apache_beam.utils.subprocess_server: at java.base/java. lang.Thread.run(Thread.java:829) INFO:apache_beam.utils.subprocess_server:Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed. INFO:apache_beam.utils.subprocess_server: at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult( JobResult.java:144) INFO:apache_beam.utils.subprocess_server: at org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null $6(ClusterClientJobClientAdapter.java:128) INFO:apache_beam.utils.subprocess_server: ... 23 more INFO:apache_beam.utils.subprocess_server:Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy INFO:apache_beam.utils.subprocess_server: at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure (ExecutionFailureHandler.java:139) INFO:apache_beam.utils.subprocess_server: at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult (ExecutionFailureHandler.java:83) INFO:apache_beam.utils.subprocess_server: at org.apache.flink.runtime.scheduler.DefaultScheduler.recordTaskFailure( DefaultScheduler.java:256) INFO:apache_beam.utils.subprocess_server: at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure( DefaultScheduler.java:247) INFO:apache_beam.utils.subprocess_server: at org.apache.flink.runtime.scheduler.DefaultScheduler.onTaskFailed( DefaultScheduler.java:240) INFO:apache_beam.utils.subprocess_server: at org.apache.flink.runtime.scheduler.SchedulerBase.onTaskExecutionStateUpdate( SchedulerBase.java:738) INFO:apache_beam.utils.subprocess_server: at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState( SchedulerBase.java:715) INFO:apache_beam.utils.subprocess_server: at org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState( SchedulerNG.java:78) INFO:apache_beam.utils.subprocess_server: at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState( JobMaster.java:477) INFO:apache_beam.utils.subprocess_server: at jdk.internal.reflect.GeneratedMethodAccessor9.invoke(Unknown Source) INFO:apache_beam.utils.subprocess_server: at java.base/jdk. internal.reflect.DelegatingMethodAccessorImpl.invoke( DelegatingMethodAccessorImpl.java:43) INFO:apache_beam.utils.subprocess_server: at java.base/java. lang.reflect.Method.invoke(Method.java:566) INFO:apache_beam.utils.subprocess_server: at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$handleRpcInvocation$1( AkkaRpcActor.java:309) INFO:apache_beam.utils.subprocess_server: at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader (ClassLoadingUtils.java:83) INFO:apache_beam.utils.subprocess_server: at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation( AkkaRpcActor.java:307) INFO:apache_beam.utils.subprocess_server: at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage( AkkaRpcActor.java:222) INFO:apache_beam.utils.subprocess_server: at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage( FencedAkkaRpcActor.java:84) INFO:apache_beam.utils.subprocess_server: at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage( AkkaRpcActor.java:168) INFO:apache_beam.utils.subprocess_server: at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24) INFO:apache_beam.utils.subprocess_server: at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20) INFO:apache_beam.utils.subprocess_server: at scala.PartialFunction.applyOrElse(PartialFunction.scala:123) INFO:apache_beam.utils.subprocess_server: at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122) INFO:apache_beam.utils.subprocess_server: at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20) INFO:apache_beam.utils.subprocess_server: at scala.PartialFunction$ OrElse.applyOrElse(PartialFunction.scala:171) INFO:apache_beam.utils.subprocess_server: at scala.PartialFunction$ OrElse.applyOrElse(PartialFunction.scala:172) INFO:apache_beam.utils.subprocess_server: at scala.PartialFunction$ OrElse.applyOrElse(PartialFunction.scala:172) INFO:apache_beam.utils.subprocess_server: at akka.actor.Actor.aroundReceive(Actor.scala:537) INFO:apache_beam.utils.subprocess_server: at akka.actor.Actor.aroundReceive$(Actor.scala:535) INFO:apache_beam.utils.subprocess_server: at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:220) INFO:apache_beam.utils.subprocess_server: at akka.actor.ActorCell.receiveMessage(ActorCell.scala:580) INFO:apache_beam.utils.subprocess_server: at akka.actor.ActorCell.invoke(ActorCell.scala:548) INFO:apache_beam.utils.subprocess_server: at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270) INFO:apache_beam.utils.subprocess_server: at akka.dispatch.Mailbox.run (Mailbox.scala:231) INFO:apache_beam.utils.subprocess_server: at akka.dispatch.Mailbox.exec(Mailbox.scala:243) INFO:apache_beam.utils.subprocess_server: at java.base/java. util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:290) INFO:apache_beam.utils.subprocess_server: at java.base/java. util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1020) INFO:apache_beam.utils.subprocess_server: at java.base/java. util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1656) INFO:apache_beam.utils.subprocess_server: at java.base/java. util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1594) INFO:apache_beam.utils.subprocess_server: at java.base/java. util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:183) INFO:apache_beam.utils.subprocess_server:Caused by: java.lang.Exception: The user defined 'open()' method caused an exception: java.lang.IllegalStateException: No container running for id c03fe103bd0c2498b66945608709e6488dabbae9ba359c1e7600c1a9a9c63800 INFO:apache_beam.utils.subprocess_server: at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:508) INFO:apache_beam.utils.subprocess_server: at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:357) INFO:apache_beam.utils.subprocess_server: at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring( Task.java:935) INFO:apache_beam.utils.subprocess_server: at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:914) INFO:apache_beam.utils.subprocess_server: at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:728) INFO:apache_beam.utils.subprocess_server: at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550) INFO:apache_beam.utils.subprocess_server: at java.base/java. lang.Thread.run(Thread.java:829) INFO:apache_beam.utils.subprocess_server:Caused by: org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.UncheckedExecutionException: java.lang.IllegalStateException: No container running for id c03fe103bd0c2498b66945608709e6488dabbae9ba359c1e7600c1a9a9c63800 INFO:apache_beam.utils.subprocess_server: at org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LocalCache$ Segment.get(LocalCache.java:2086) INFO:apache_beam.utils.subprocess_server: at org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LocalCache.get (LocalCache.java:4012) INFO:apache_beam.utils.subprocess_server: at org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LocalCache.getOrLoad (LocalCache.java:4035) INFO:apache_beam.utils.subprocess_server: at org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LocalCache$ LocalLoadingCache.get(LocalCache.java:5013) INFO:apache_beam.utils.subprocess_server: at org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LocalCache$ LocalLoadingCache.getUnchecked(LocalCache.java:5020) INFO:apache_beam.utils.subprocess_server: at org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory $SimpleStageBundleFactory.<init>(DefaultJobBundleFactory.java:458) INFO:apache_beam.utils.subprocess_server: at org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory $SimpleStageBundleFactory.<init>(DefaultJobBundleFactory.java:443) INFO:apache_beam.utils.subprocess_server: at org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory.forStage (DefaultJobBundleFactory.java:310) INFO:apache_beam.utils.subprocess_server: at org.apache.beam.runners.fnexecution.control.DefaultExecutableStageContext.getStageBundleFactory (DefaultExecutableStageContext.java:38) INFO:apache_beam.utils.subprocess_server: at org.apache.beam.runners.fnexecution.control.ReferenceCountingExecutableStageContextFactory $WrappedContext.getStageBundleFactory( ReferenceCountingExecutableStageContextFactory.java:207) INFO:apache_beam.utils.subprocess_server: at org.apache.beam.runners.flink.translation.functions.FlinkExecutableStageFunction.open (FlinkExecutableStageFunction.java:157) INFO:apache_beam.utils.subprocess_server: at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction( FunctionUtils.java:34) INFO:apache_beam.utils.subprocess_server: at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:503) INFO:apache_beam.utils.subprocess_server: ... 6 more INFO:apache_beam.utils.subprocess_server:Caused by: java.lang.IllegalStateException: No container running for id c03fe103bd0c2498b66945608709e6488dabbae9ba359c1e7600c1a9a9c63800 INFO:apache_beam.utils.subprocess_server: at org.apache.beam.runners.fnexecution.environment.DockerEnvironmentFactory.createEnvironment (DockerEnvironmentFactory.java:137) INFO:apache_beam.utils.subprocess_server: at org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$1.load( DefaultJobBundleFactory.java:259) INFO:apache_beam.utils.subprocess_server: at org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$1.load( DefaultJobBundleFactory.java:232) INFO:apache_beam.utils.subprocess_server: at org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LocalCache$ LoadingValueReference.loadFuture(LocalCache.java:3571) INFO:apache_beam.utils.subprocess_server: at org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LocalCache$ Segment.loadSync(LocalCache.java:2313) INFO:apache_beam.utils.subprocess_server: at org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LocalCache$ Segment.lockedGetOrLoad(LocalCache.java:2190) INFO:apache_beam.utils.subprocess_server: at org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LocalCache$ Segment.get(LocalCache.java:2080) INFO:apache_beam.utils.subprocess_server: ... 18 more INFO:apache_beam.utils.subprocess_server: Suppressed: java.io.IOException: Received exit code 1 for command 'docker kill c03fe103bd0c2498b66945608709e6488dabbae9ba359c1e7600c1a9a9c63800'. stderr: Error response from daemon: Cannot kill container: c03fe103bd0c2498b66945608709e6488dabbae9ba359c1e7600c1a9a9c63800: Container c03fe103bd0c2498b66945608709e6488dabbae9ba359c1e7600c1a9a9c63800 is not running INFO:apache_beam.utils.subprocess_server: at org.apache.beam.runners.fnexecution.environment.DockerCommand.runShortCommand (DockerCommand.java:255) INFO:apache_beam.utils.subprocess_server: at org.apache.beam.runners.fnexecution.environment.DockerCommand.runShortCommand (DockerCommand.java:181) INFO:apache_beam.utils.subprocess_server: at org.apache.beam.runners.fnexecution.environment.DockerCommand.killContainer( DockerCommand.java:161) INFO:apache_beam.utils.subprocess_server: at org.apache.beam.runners.fnexecution.environment.DockerEnvironmentFactory.createEnvironment (DockerEnvironmentFactory.java:161) INFO:apache_beam.utils.subprocess_server: ... 24 more INFO:apache_beam.utils.subprocess_server: ERROR:root:java.lang.IllegalStateException: No container running for id c03fe103bd0c2498b66945608709e6488dabbae9ba359c1e7600c1a9a9c63800 INFO:apache_beam.runners.portability.portable_runner:Job state changed to FAILED Traceback (most recent call last): File "/home/jaehyeon/projects/general-demos/beam-dev-env/section3/sql_transform.py", line 72, in <module> run() File "/home/jaehyeon/projects/general-demos/beam-dev-env/section3/sql_transform.py", line 68, in run p.run().wait_until_finish() File "/home/jaehyeon/projects/general-demos/venv/lib/python3.10/site-packages/apache_beam/runners/portability/portable_runner.py", line 576, in wait_until_finish raise self._runtime_exception RuntimeError: Pipeline sql-transform_4f61dc80-9aeb-4e85-9bfb-3df1b6e3db48 failed in state FAILED: java.lang.IllegalStateException: No container running for id c03fe103bd0c2498b66945608709e6488dabbae9ba359c1e7600c1a9a9c63800 On Tue, 12 Mar 2024 at 15:07, Jaehyeon Kim <dott...@gmail.com> wrote: > Hello, > > ✔️ I have a simple pipeline that transforms data with *SqlTransform*. I > use the *FlinkRunner *and, when I don't specify the *flink_master *option > and use an embedded flink cluster, it works fine. However, if I use a local > flink cluster and specify the *flink_master *option to *localhost:8081*, > the expansion service running on Docker doesn't work. The flink cluster > gets started locally without using Docker ( > *./setup/flink-1.16.3/bin/start-cluster.sh*). > > ✔️ The pipeline code can be found below and I added some troubleshooting > details below it. > > import argparse > import logging > import typing > > import apache_beam as beam > from apache_beam.transforms.sql import SqlTransform > from apache_beam.options.pipeline_options import PipelineOptions > from apache_beam.options.pipeline_options import SetupOptions > > class MyItem(typing.NamedTuple): > id: int > name: str > value: float > > beam.coders.registry.register_coder(MyItem, beam.coders.RowCoder) > > def convert_to_item(row: list): > cols = ["id", "name", "value"] > return MyItem(**dict(zip(cols, row))) > > def run(): > parser = argparse.ArgumentParser( > description="Process statistics by user from website visit event" > ) > parser.add_argument( > "--runner", default="FlinkRunner", help="Specify Apache Beam > Runner" > ) > parser.add_argument( > "--use_own", > action="store_true", > default="Flag to indicate whether to use an own local cluster", > ) > opts = parser.parse_args() > > options = PipelineOptions() > pipeline_opts = { > "runner": opts.runner, > "job_name": "sql-transform", > "environment_type": "LOOPBACK", > } > if opts.use_own is True: > pipeline_opts = {**pipeline_opts, **{"flink_master": > "localhost:8081"}} > print(pipeline_opts) > options = PipelineOptions([], **pipeline_opts) > # Required, else it will complain that when importing worker functions > options.view_as(SetupOptions).save_main_session = True > > query = """ > SELECT * FROM PCOLLECTION WHERE name = 'jack' > """ > > p = beam.Pipeline(options=options) > ( > p > | beam.Create([[1, "john", 123], [2, "jane", 234], [3, "jack", 345 > ]]) > | beam.Map(convert_to_item).with_output_types(MyItem) > | SqlTransform(query) > | beam.Map(print) > ) > > logging.getLogger().setLevel(logging.WARN) > logging.info("Building pipeline ...") > > p.run().wait_until_finish() > > > if __name__ == "__main__": > run() > > ✔️ When I check the expansion service docker container, normally it > downloads a JAR file and starts SDK Fn Harness. However it doesn't move > into the download step when I specify the *flink_master *to > *localhost:8081*. As the service container gets stuck, the flink task > manager considers it is lost and the container gets killed. > > 2024/03/12 03:49:23 Provision info: > pipeline_options:{fields:{key:"beam:option:allow_non_deterministic_key_coders:v1" > ... > runner_capabilities:"beam:protocol:control_response_elements_embedding:v1" > *2024/03/12 03:49:24 Downloaded: > /tmp/1-2/staged/beam-sdks-java-extensions-sql-expansion-service-2.53.0--5mGwmjENLc1fPWWdDg_S2ASPB8WOYTnUARk_IhI-_A.jar > (sha256:fb9986c268c434b7357cf59674383f4b60123c1f163984e7500464fc8848fbf0, > size: 281440385)* > SLF4J: Class path contains multiple SLF4J bindings. > SLF4J: Found binding in > [jar:file:/opt/apache/beam/jars/slf4j-jdk14.jar!/org/slf4j/impl/StaticLoggerBinder.class] > SLF4J: Found binding in > [jar:file:/tmp/1-2/staged/beam-sdks-java-extensions-sql-expansion-service-2.53.0--5mGwmjENLc1fPWWdDg_S2ASPB8WOYTnUARk_IhI-_A.jar!/org/slf4j/impl/StaticLoggerBinder.class] > SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an > explanation. > SLF4J: Actual binding is of type [org.slf4j.impl.JDK14LoggerFactory] > SDK Fn Harness started > Harness ID 1-2 > Logging location url: "localhost:42133" > > Control location url: "localhost:36449" > > Status location null > Pipeline Options File pipeline_options.json > Pipeline Options File pipeline_options.json exists. Overriding existing > options. > Pipeline options > {"beam:option:allow_non_deterministic_key_coders:v1":false,..."beam:option:verify_row_values:v1":false} > > ✔️ The difference with/without the *flink_master *option is quite minimal > in the pipeline options as shown below. However I'm not sure what makes it > fails to run the pipeline successfully. > > without flink_master - fields:{key:"beam:option:flink_master:v1" > value:{string_value:"[auto]"}} > with flink_master - fields:{key:"beam:option:flink_master:v1" > value:{string_value:"http://localhost:8081"}} > > Can you please inform me how to fix the issue? > > Cheers, > Jaehyeon >