Hello,

Attached includes more logging details.

WARNING:root:Waiting for grpc channel to be ready at localhost:45371.
INFO:root:Building pipeline ...
INFO:apache_beam.runners.worker.worker_pool_main:Listening for workers at
localhost:36959
INFO:apache_beam.runners.portability.fn_api_runner.translations:====================
<function pack_combiners at 0x7fd0ac33d630> ====================
INFO:apache_beam.runners.portability.fn_api_runner.translations:====================
<function lift_combiners at 0x7fd0ac33d6c0> ====================
INFO:apache_beam.runners.portability.fn_api_runner.translations:====================
<function sort_stages at 0x7fd0ac33de10> ====================
INFO:apache_beam.runners.portability.flink_runner:Adding HTTP protocol
scheme to flink_master parameter: http://localhost:8081
INFO:apache_beam.utils.subprocess_server:Using cached job server jar from
https://repo.maven.apache.org/maven2/org/apache/beam/beam-runners-flink-1.16-job-server/2.53.0/beam-runners-flink-1.16-job-server-2.53.0.jar
INFO:apache_beam.utils.subprocess_server:Starting service with ('java'
'-jar'
'/home/jaehyeon/.apache_beam/cache/jars/beam-runners-flink-1.16-job-server-2.53.0.jar'
'--flink-master' 'http://localhost:8081' '--artifacts-dir'
'/tmp/beam-temp80sx9jtu/artifactsv2s2mbvo' '--job-port' '50353'
'--artifact-port' '0' '--expansion-port' '0')
INFO:apache_beam.utils.subprocess_server:Mar 13, 2024 6:49:33 AM
org.apache.beam.runners.jobsubmission.JobServerDriver
createArtifactStagingService
INFO:apache_beam.utils.subprocess_server:INFO: ArtifactStagingService
started on localhost:38121
INFO:apache_beam.utils.subprocess_server:Mar 13, 2024 6:49:33 AM
org.apache.beam.runners.jobsubmission.JobServerDriver createExpansionService
INFO:apache_beam.utils.subprocess_server:INFO: Java ExpansionService
started on localhost:36431
INFO:apache_beam.utils.subprocess_server:Mar 13, 2024 6:49:33 AM
org.apache.beam.runners.jobsubmission.JobServerDriver createJobServer
INFO:apache_beam.utils.subprocess_server:INFO: JobService started on
localhost:50353
INFO:apache_beam.utils.subprocess_server:Mar 13, 2024 6:49:33 AM
org.apache.beam.runners.jobsubmission.JobServerDriver run
INFO:apache_beam.utils.subprocess_server:INFO: Job server now running,
terminate with Ctrl+C
WARNING:root:Waiting for grpc channel to be ready at localhost:50353.
INFO:apache_beam.utils.subprocess_server:Mar 13, 2024 6:49:36 AM
org.apache.beam.runners.fnexecution.artifact.ArtifactStagingService$2 onNext
INFO:apache_beam.utils.subprocess_server:INFO: Staging artifacts for
job_ddf6a897-788d-49ca-8d14-51029ce17f58.
INFO:apache_beam.utils.subprocess_server:Mar 13, 2024 6:49:36 AM
org.apache.beam.runners.fnexecution.artifact.ArtifactStagingService$2
resolveNextEnvironment
INFO:apache_beam.utils.subprocess_server:INFO: Resolving artifacts for
job_ddf6a897-788d-49ca-8d14-51029ce17f58.0
:ref_Environment_default_environment_2.
INFO:apache_beam.utils.subprocess_server:Mar 13, 2024 6:49:36 AM
org.apache.beam.runners.fnexecution.artifact.ArtifactStagingService$2 onNext
INFO:apache_beam.utils.subprocess_server:INFO: Getting 1 artifacts for
job_ddf6a897-788d-49ca-8d14-51029ce17f58.0:external_1beam:env:docker:v1.
INFO:apache_beam.utils.subprocess_server:Mar 13, 2024 6:49:36 AM
org.apache.beam.runners.fnexecution.artifact.ArtifactStagingService$2
resolveNextEnvironment
INFO:apache_beam.utils.subprocess_server:INFO: Resolving artifacts for
job_ddf6a897-788d-49ca-8d14-51029ce17f58.0:external_1beam:env:docker:v1.
INFO:apache_beam.utils.subprocess_server:Mar 13, 2024 6:49:36 AM
org.apache.beam.runners.fnexecution.artifact.ArtifactStagingService$2 onNext
INFO:apache_beam.utils.subprocess_server:INFO: Getting 1 artifacts for
job_ddf6a897-788d-49ca-8d14-51029ce17f58.null.
INFO:apache_beam.utils.subprocess_server:Mar 13, 2024 6:49:36 AM
org.apache.beam.runners.fnexecution.artifact.ArtifactStagingService$2
finishStaging
INFO:apache_beam.utils.subprocess_server:INFO: Artifacts fully staged for
job_ddf6a897-788d-49ca-8d14-51029ce17f58.
INFO:apache_beam.utils.subprocess_server:Mar 13, 2024 6:49:37 AM
org.apache.beam.runners.flink.FlinkJobInvoker invokeWithExecutor
INFO:apache_beam.utils.subprocess_server:INFO: Invoking job
sql-transform_4f61dc80-9aeb-4e85-9bfb-3df1b6e3db48 with pipeline runner
org.apache.beam.runners.flink.FlinkPipelineRunner@2f4dfe06
INFO:apache_beam.utils.subprocess_server:Mar 13, 2024 6:49:37 AM
org.apache.beam.runners.jobsubmission.JobInvocation start
INFO:apache_beam.utils.subprocess_server:INFO: Starting job invocation
sql-transform_4f61dc80-9aeb-4e85-9bfb-3df1b6e3db48
INFO:apache_beam.runners.portability.portable_runner:Environment "LOOPBACK"
has started a component necessary for the execution. Be sure to run the
pipeline using
  with Pipeline() as p:
    p.apply(..)
This ensures that the pipeline finishes before this program exits.
INFO:apache_beam.utils.subprocess_server:Mar 13, 2024 6:49:37 AM
org.apache.beam.runners.flink.FlinkPipelineRunner runPipelineWithTranslator
INFO:apache_beam.utils.subprocess_server:INFO: Translating pipeline to
Flink program.
INFO:apache_beam.runners.portability.portable_runner:Job state changed to
STOPPED
INFO:apache_beam.runners.portability.portable_runner:Job state changed to
STARTING
INFO:apache_beam.runners.portability.portable_runner:Job state changed to
RUNNING
INFO:apache_beam.utils.subprocess_server:Mar 13, 2024 6:49:38 AM
org.apache.beam.runners.flink.FlinkExecutionEnvironments
createBatchExecutionEnvironment
INFO:apache_beam.utils.subprocess_server:INFO: Creating a Batch Execution
Environment.
INFO:apache_beam.utils.subprocess_server:Mar 13, 2024 6:49:38 AM
org.apache.beam.runners.flink.FlinkExecutionEnvironments
createBatchExecutionEnvironment
INFO:apache_beam.utils.subprocess_server:INFO: Using Flink Master URL
localhost:8081.
INFO:apache_beam.utils.subprocess_server:Mar 13, 2024 6:49:38 AM
org.apache.flink.api.java.utils.PlanGenerator logTypeRegistrationDetails
INFO:apache_beam.utils.subprocess_server:INFO: The job has 0 registered
types and 0 default Kryo serializers
INFO:apache_beam.utils.subprocess_server:Mar 13, 2024 6:49:39 AM
org.apache.flink.client.program.rest.RestClusterClient lambda$submitJob$7
INFO:apache_beam.utils.subprocess_server:INFO: Submitting job
'sql-transform' (772417f6ae0736da9f7156d334c4b8e7).
INFO:apache_beam.utils.subprocess_server:Mar 13, 2024 6:49:44 AM
org.apache.flink.client.program.rest.RestClusterClient lambda$null$6
INFO:apache_beam.utils.subprocess_server:INFO: Successfully submitted job
'sql-transform' (772417f6ae0736da9f7156d334c4b8e7) to 'http://localhost:8081
'.
INFO:apache_beam.runners.worker.statecache:Creating state cache with size
104857600
INFO:apache_beam.runners.worker.sdk_worker:Creating insecure control
channel for localhost:34207.
INFO:apache_beam.runners.worker.sdk_worker:Control channel established.
INFO:apache_beam.runners.worker.sdk_worker:Initializing SDKHarness with
unbounded number of workers.
INFO:apache_beam.runners.worker.sdk_worker:Creating insecure state channel
for localhost:37411.
INFO:apache_beam.runners.worker.sdk_worker:State channel established.
INFO:apache_beam.runners.worker.data_plane:Creating client data channel for
localhost:34955
INFO:apache_beam.utils.subprocess_server:Mar 13, 2024 6:50:09 AM
org.apache.beam.runners.jobsubmission.JobInvocation$1 onFailure
INFO:apache_beam.utils.subprocess_server:SEVERE: Error during job
invocation sql-transform_4f61dc80-9aeb-4e85-9bfb-3df1b6e3db48.
INFO:apache_beam.utils.subprocess_server:
org.apache.flink.client.program.ProgramInvocationException: Job failed
(JobID: 772417f6ae0736da9f7156d334c4b8e7)
INFO:apache_beam.utils.subprocess_server:       at
org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null
$6(ClusterClientJobClientAdapter.java:130)
INFO:apache_beam.utils.subprocess_server:       at java.base/java.
util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:
642)
INFO:apache_beam.utils.subprocess_server:       at java.base/java.
util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
INFO:apache_beam.utils.subprocess_server:       at java.base/java.
util.concurrent.CompletableFuture.complete(CompletableFuture.java:2079)
INFO:apache_beam.utils.subprocess_server:       at
org.apache.flink.util.concurrent.FutureUtils.lambda$retryOperationWithDelay$
6(FutureUtils.java:301)
INFO:apache_beam.utils.subprocess_server:       at java.base/java.
util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859
)
INFO:apache_beam.utils.subprocess_server:       at java.base/java.
util.concurrent.CompletableFuture$UniWhenComplete.tryFire(
CompletableFuture.java:837)
INFO:apache_beam.utils.subprocess_server:       at java.base/java.
util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
INFO:apache_beam.utils.subprocess_server:       at java.base/java.
util.concurrent.CompletableFuture.complete(CompletableFuture.java:2079)
INFO:apache_beam.utils.subprocess_server:       at
org.apache.flink.client.program.rest.RestClusterClient.lambda
$pollResourceAsync$31(RestClusterClient.java:772)
INFO:apache_beam.utils.subprocess_server:       at java.base/java.
util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859
)
INFO:apache_beam.utils.subprocess_server:       at java.base/java.
util.concurrent.CompletableFuture$UniWhenComplete.tryFire(
CompletableFuture.java:837)
INFO:apache_beam.utils.subprocess_server:       at java.base/java.
util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
INFO:apache_beam.utils.subprocess_server:       at java.base/java.
util.concurrent.CompletableFuture.complete(CompletableFuture.java:2079)
INFO:apache_beam.utils.subprocess_server:       at
org.apache.flink.util.concurrent.FutureUtils.lambda$retryOperationWithDelay$
6(FutureUtils.java:301)
INFO:apache_beam.utils.subprocess_server:       at java.base/java.
util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859
)
INFO:apache_beam.utils.subprocess_server:       at java.base/java.
util.concurrent.CompletableFuture$UniWhenComplete.tryFire(
CompletableFuture.java:837)
INFO:apache_beam.utils.subprocess_server:       at java.base/java.
util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
INFO:apache_beam.utils.subprocess_server:       at java.base/java.
util.concurrent.CompletableFuture.postFire(CompletableFuture.java:610)
INFO:apache_beam.utils.subprocess_server:       at java.base/java.
util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:
1085)
INFO:apache_beam.utils.subprocess_server:       at java.base/java.
util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:478)
INFO:apache_beam.utils.subprocess_server:       at java.base/java.
util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
INFO:apache_beam.utils.subprocess_server:       at java.base/java.
util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
INFO:apache_beam.utils.subprocess_server:       at java.base/java.
lang.Thread.run(Thread.java:829)
INFO:apache_beam.utils.subprocess_server:Caused by:
org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
INFO:apache_beam.utils.subprocess_server:       at
org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(
JobResult.java:144)
INFO:apache_beam.utils.subprocess_server:       at
org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null
$6(ClusterClientJobClientAdapter.java:128)
INFO:apache_beam.utils.subprocess_server:       ... 23 more
INFO:apache_beam.utils.subprocess_server:Caused by:
org.apache.flink.runtime.JobException: Recovery is suppressed by
NoRestartBackoffTimeStrategy
INFO:apache_beam.utils.subprocess_server:       at
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure
(ExecutionFailureHandler.java:139)
INFO:apache_beam.utils.subprocess_server:       at
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult
(ExecutionFailureHandler.java:83)
INFO:apache_beam.utils.subprocess_server:       at
org.apache.flink.runtime.scheduler.DefaultScheduler.recordTaskFailure(
DefaultScheduler.java:256)
INFO:apache_beam.utils.subprocess_server:       at
org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(
DefaultScheduler.java:247)
INFO:apache_beam.utils.subprocess_server:       at
org.apache.flink.runtime.scheduler.DefaultScheduler.onTaskFailed(
DefaultScheduler.java:240)
INFO:apache_beam.utils.subprocess_server:       at
org.apache.flink.runtime.scheduler.SchedulerBase.onTaskExecutionStateUpdate(
SchedulerBase.java:738)
INFO:apache_beam.utils.subprocess_server:       at
org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(
SchedulerBase.java:715)
INFO:apache_beam.utils.subprocess_server:       at
org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(
SchedulerNG.java:78)
INFO:apache_beam.utils.subprocess_server:       at
org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(
JobMaster.java:477)
INFO:apache_beam.utils.subprocess_server:       at
jdk.internal.reflect.GeneratedMethodAccessor9.invoke(Unknown Source)
INFO:apache_beam.utils.subprocess_server:       at java.base/jdk.
internal.reflect.DelegatingMethodAccessorImpl.invoke(
DelegatingMethodAccessorImpl.java:43)
INFO:apache_beam.utils.subprocess_server:       at java.base/java.
lang.reflect.Method.invoke(Method.java:566)
INFO:apache_beam.utils.subprocess_server:       at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$handleRpcInvocation$1(
AkkaRpcActor.java:309)
INFO:apache_beam.utils.subprocess_server:       at
org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader
(ClassLoadingUtils.java:83)
INFO:apache_beam.utils.subprocess_server:       at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(
AkkaRpcActor.java:307)
INFO:apache_beam.utils.subprocess_server:       at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(
AkkaRpcActor.java:222)
INFO:apache_beam.utils.subprocess_server:       at
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(
FencedAkkaRpcActor.java:84)
INFO:apache_beam.utils.subprocess_server:       at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(
AkkaRpcActor.java:168)
INFO:apache_beam.utils.subprocess_server:       at
akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24)
INFO:apache_beam.utils.subprocess_server:       at
akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20)
INFO:apache_beam.utils.subprocess_server:       at
scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
INFO:apache_beam.utils.subprocess_server:       at
scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
INFO:apache_beam.utils.subprocess_server:       at
akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20)
INFO:apache_beam.utils.subprocess_server:       at scala.PartialFunction$
OrElse.applyOrElse(PartialFunction.scala:171)
INFO:apache_beam.utils.subprocess_server:       at scala.PartialFunction$
OrElse.applyOrElse(PartialFunction.scala:172)
INFO:apache_beam.utils.subprocess_server:       at scala.PartialFunction$
OrElse.applyOrElse(PartialFunction.scala:172)
INFO:apache_beam.utils.subprocess_server:       at
akka.actor.Actor.aroundReceive(Actor.scala:537)
INFO:apache_beam.utils.subprocess_server:       at
akka.actor.Actor.aroundReceive$(Actor.scala:535)
INFO:apache_beam.utils.subprocess_server:       at
akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:220)
INFO:apache_beam.utils.subprocess_server:       at
akka.actor.ActorCell.receiveMessage(ActorCell.scala:580)
INFO:apache_beam.utils.subprocess_server:       at
akka.actor.ActorCell.invoke(ActorCell.scala:548)
INFO:apache_beam.utils.subprocess_server:       at
akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270)
INFO:apache_beam.utils.subprocess_server:       at akka.dispatch.Mailbox.run
(Mailbox.scala:231)
INFO:apache_beam.utils.subprocess_server:       at
akka.dispatch.Mailbox.exec(Mailbox.scala:243)
INFO:apache_beam.utils.subprocess_server:       at java.base/java.
util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:290)
INFO:apache_beam.utils.subprocess_server:       at java.base/java.
util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1020)
INFO:apache_beam.utils.subprocess_server:       at java.base/java.
util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1656)
INFO:apache_beam.utils.subprocess_server:       at java.base/java.
util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1594)
INFO:apache_beam.utils.subprocess_server:       at java.base/java.
util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:183)
INFO:apache_beam.utils.subprocess_server:Caused by: java.lang.Exception:
The user defined 'open()' method caused an exception:
java.lang.IllegalStateException: No container running for id
c03fe103bd0c2498b66945608709e6488dabbae9ba359c1e7600c1a9a9c63800
INFO:apache_beam.utils.subprocess_server:       at
org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:508)
INFO:apache_beam.utils.subprocess_server:       at
org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:357)
INFO:apache_beam.utils.subprocess_server:       at
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(
Task.java:935)
INFO:apache_beam.utils.subprocess_server:       at
org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:914)
INFO:apache_beam.utils.subprocess_server:       at
org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:728)
INFO:apache_beam.utils.subprocess_server:       at
org.apache.flink.runtime.taskmanager.Task.run(Task.java:550)
INFO:apache_beam.utils.subprocess_server:       at java.base/java.
lang.Thread.run(Thread.java:829)
INFO:apache_beam.utils.subprocess_server:Caused by:
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.UncheckedExecutionException:
java.lang.IllegalStateException: No container running for id
c03fe103bd0c2498b66945608709e6488dabbae9ba359c1e7600c1a9a9c63800
INFO:apache_beam.utils.subprocess_server:       at
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LocalCache$
Segment.get(LocalCache.java:2086)
INFO:apache_beam.utils.subprocess_server:       at
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LocalCache.get
(LocalCache.java:4012)
INFO:apache_beam.utils.subprocess_server:       at
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LocalCache.getOrLoad
(LocalCache.java:4035)
INFO:apache_beam.utils.subprocess_server:       at
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LocalCache$
LocalLoadingCache.get(LocalCache.java:5013)
INFO:apache_beam.utils.subprocess_server:       at
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LocalCache$
LocalLoadingCache.getUnchecked(LocalCache.java:5020)
INFO:apache_beam.utils.subprocess_server:       at
org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory
$SimpleStageBundleFactory.<init>(DefaultJobBundleFactory.java:458)
INFO:apache_beam.utils.subprocess_server:       at
org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory
$SimpleStageBundleFactory.<init>(DefaultJobBundleFactory.java:443)
INFO:apache_beam.utils.subprocess_server:       at
org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory.forStage
(DefaultJobBundleFactory.java:310)
INFO:apache_beam.utils.subprocess_server:       at
org.apache.beam.runners.fnexecution.control.DefaultExecutableStageContext.getStageBundleFactory
(DefaultExecutableStageContext.java:38)
INFO:apache_beam.utils.subprocess_server:       at
org.apache.beam.runners.fnexecution.control.ReferenceCountingExecutableStageContextFactory
$WrappedContext.getStageBundleFactory(
ReferenceCountingExecutableStageContextFactory.java:207)
INFO:apache_beam.utils.subprocess_server:       at
org.apache.beam.runners.flink.translation.functions.FlinkExecutableStageFunction.open
(FlinkExecutableStageFunction.java:157)
INFO:apache_beam.utils.subprocess_server:       at
org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(
FunctionUtils.java:34)
INFO:apache_beam.utils.subprocess_server:       at
org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:503)
INFO:apache_beam.utils.subprocess_server:       ... 6 more
INFO:apache_beam.utils.subprocess_server:Caused by:
java.lang.IllegalStateException: No container running for id
c03fe103bd0c2498b66945608709e6488dabbae9ba359c1e7600c1a9a9c63800
INFO:apache_beam.utils.subprocess_server:       at
org.apache.beam.runners.fnexecution.environment.DockerEnvironmentFactory.createEnvironment
(DockerEnvironmentFactory.java:137)
INFO:apache_beam.utils.subprocess_server:       at
org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$1.load(
DefaultJobBundleFactory.java:259)
INFO:apache_beam.utils.subprocess_server:       at
org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$1.load(
DefaultJobBundleFactory.java:232)
INFO:apache_beam.utils.subprocess_server:       at
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LocalCache$
LoadingValueReference.loadFuture(LocalCache.java:3571)
INFO:apache_beam.utils.subprocess_server:       at
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LocalCache$
Segment.loadSync(LocalCache.java:2313)
INFO:apache_beam.utils.subprocess_server:       at
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LocalCache$
Segment.lockedGetOrLoad(LocalCache.java:2190)
INFO:apache_beam.utils.subprocess_server:       at
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LocalCache$
Segment.get(LocalCache.java:2080)
INFO:apache_beam.utils.subprocess_server:       ... 18 more
INFO:apache_beam.utils.subprocess_server:       Suppressed:
java.io.IOException: Received exit code 1 for command 'docker kill
c03fe103bd0c2498b66945608709e6488dabbae9ba359c1e7600c1a9a9c63800'. stderr:
Error response from daemon: Cannot kill container:
c03fe103bd0c2498b66945608709e6488dabbae9ba359c1e7600c1a9a9c63800: Container
c03fe103bd0c2498b66945608709e6488dabbae9ba359c1e7600c1a9a9c63800 is not
running
INFO:apache_beam.utils.subprocess_server:               at
org.apache.beam.runners.fnexecution.environment.DockerCommand.runShortCommand
(DockerCommand.java:255)
INFO:apache_beam.utils.subprocess_server:               at
org.apache.beam.runners.fnexecution.environment.DockerCommand.runShortCommand
(DockerCommand.java:181)
INFO:apache_beam.utils.subprocess_server:               at
org.apache.beam.runners.fnexecution.environment.DockerCommand.killContainer(
DockerCommand.java:161)
INFO:apache_beam.utils.subprocess_server:               at
org.apache.beam.runners.fnexecution.environment.DockerEnvironmentFactory.createEnvironment
(DockerEnvironmentFactory.java:161)
INFO:apache_beam.utils.subprocess_server:               ... 24 more
INFO:apache_beam.utils.subprocess_server:
ERROR:root:java.lang.IllegalStateException: No container running for id
c03fe103bd0c2498b66945608709e6488dabbae9ba359c1e7600c1a9a9c63800
INFO:apache_beam.runners.portability.portable_runner:Job state changed to
FAILED
Traceback (most recent call last):
  File
"/home/jaehyeon/projects/general-demos/beam-dev-env/section3/sql_transform.py",
line 72, in <module>
    run()
  File
"/home/jaehyeon/projects/general-demos/beam-dev-env/section3/sql_transform.py",
line 68, in run
    p.run().wait_until_finish()
  File
"/home/jaehyeon/projects/general-demos/venv/lib/python3.10/site-packages/apache_beam/runners/portability/portable_runner.py",
line 576, in wait_until_finish
    raise self._runtime_exception
RuntimeError: Pipeline sql-transform_4f61dc80-9aeb-4e85-9bfb-3df1b6e3db48
failed in state FAILED: java.lang.IllegalStateException: No container
running for id
c03fe103bd0c2498b66945608709e6488dabbae9ba359c1e7600c1a9a9c63800


On Tue, 12 Mar 2024 at 15:07, Jaehyeon Kim <dott...@gmail.com> wrote:

> Hello,
>
> ✔️ I have a simple pipeline that transforms data with *SqlTransform*. I
> use the *FlinkRunner *and, when I don't specify the *flink_master *option
> and use an embedded flink cluster, it works fine. However, if I use a local
> flink cluster and specify the *flink_master *option to *localhost:8081*,
> the expansion service running on Docker doesn't work. The flink cluster
> gets started locally without using Docker (
> *./setup/flink-1.16.3/bin/start-cluster.sh*).
>
> ✔️ The pipeline code can be found below and I added some troubleshooting
> details below it.
>
> import argparse
> import logging
> import typing
>
> import apache_beam as beam
> from apache_beam.transforms.sql import SqlTransform
> from apache_beam.options.pipeline_options import PipelineOptions
> from apache_beam.options.pipeline_options import SetupOptions
>
> class MyItem(typing.NamedTuple):
>     id: int
>     name: str
>     value: float
>
> beam.coders.registry.register_coder(MyItem, beam.coders.RowCoder)
>
> def convert_to_item(row: list):
>     cols = ["id", "name", "value"]
>     return MyItem(**dict(zip(cols, row)))
>
> def run():
>     parser = argparse.ArgumentParser(
>         description="Process statistics by user from website visit event"
>     )
>     parser.add_argument(
>         "--runner", default="FlinkRunner", help="Specify Apache Beam
> Runner"
>     )
>     parser.add_argument(
>         "--use_own",
>         action="store_true",
>         default="Flag to indicate whether to use an own local cluster",
>     )
>     opts = parser.parse_args()
>
>     options = PipelineOptions()
>     pipeline_opts = {
>         "runner": opts.runner,
>         "job_name": "sql-transform",
>         "environment_type": "LOOPBACK",
>     }
>     if opts.use_own is True:
>         pipeline_opts = {**pipeline_opts, **{"flink_master":
> "localhost:8081"}}
>     print(pipeline_opts)
>     options = PipelineOptions([], **pipeline_opts)
>     # Required, else it will complain that when importing worker functions
>     options.view_as(SetupOptions).save_main_session = True
>
>     query = """
>     SELECT * FROM PCOLLECTION WHERE name = 'jack'
>     """
>
>     p = beam.Pipeline(options=options)
>     (
>         p
>         | beam.Create([[1, "john", 123], [2, "jane", 234], [3, "jack", 345
> ]])
>         | beam.Map(convert_to_item).with_output_types(MyItem)
>         | SqlTransform(query)
>         | beam.Map(print)
>     )
>
>     logging.getLogger().setLevel(logging.WARN)
>     logging.info("Building pipeline ...")
>
>     p.run().wait_until_finish()
>
>
> if __name__ == "__main__":
>     run()
>
> ✔️ When I check the expansion service docker container, normally it
> downloads a JAR file and starts SDK Fn Harness. However it doesn't move
> into the download step when I specify the *flink_master *to
> *localhost:8081*. As the service container gets stuck, the flink task
> manager considers it is lost and the container gets killed.
>
> 2024/03/12 03:49:23 Provision info:
> pipeline_options:{fields:{key:"beam:option:allow_non_deterministic_key_coders:v1"
> ...
> runner_capabilities:"beam:protocol:control_response_elements_embedding:v1"
> *2024/03/12 03:49:24 Downloaded:
> /tmp/1-2/staged/beam-sdks-java-extensions-sql-expansion-service-2.53.0--5mGwmjENLc1fPWWdDg_S2ASPB8WOYTnUARk_IhI-_A.jar
> (sha256:fb9986c268c434b7357cf59674383f4b60123c1f163984e7500464fc8848fbf0,
> size: 281440385)*
> SLF4J: Class path contains multiple SLF4J bindings.
> SLF4J: Found binding in
> [jar:file:/opt/apache/beam/jars/slf4j-jdk14.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: Found binding in
> [jar:file:/tmp/1-2/staged/beam-sdks-java-extensions-sql-expansion-service-2.53.0--5mGwmjENLc1fPWWdDg_S2ASPB8WOYTnUARk_IhI-_A.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an
> explanation.
> SLF4J: Actual binding is of type [org.slf4j.impl.JDK14LoggerFactory]
> SDK Fn Harness started
> Harness ID 1-2
> Logging location url: "localhost:42133"
>
> Control location url: "localhost:36449"
>
> Status location null
> Pipeline Options File pipeline_options.json
> Pipeline Options File pipeline_options.json exists. Overriding existing
> options.
> Pipeline options
> {"beam:option:allow_non_deterministic_key_coders:v1":false,..."beam:option:verify_row_values:v1":false}
>
> ✔️ The difference with/without the *flink_master *option is quite minimal
> in the pipeline options as shown below. However I'm not sure what makes it
> fails to run the pipeline successfully.
>
> without flink_master - fields:{key:"beam:option:flink_master:v1"
> value:{string_value:"[auto]"}}
> with flink_master      - fields:{key:"beam:option:flink_master:v1"
> value:{string_value:"http://localhost:8081"}}
>
> Can you please inform me how to fix the issue?
>
> Cheers,
> Jaehyeon
>

Reply via email to