kishanshukla-2307 opened a new issue, #36539:
URL: https://github.com/apache/beam/issues/36539

   ### What happened?
   
   I’m trying to run a Go → Kafka → Go (KafkaIO) proof of concept using Apache 
Beam Go SDK v2.68 with the Flink portable runner.
   The job starts successfully and launches two SDK harnesses (Go and Java), 
but the KafkaIO read transform never emits elements — the job stays idle and 
periodically logs “operation ongoing in bundle … without outputting or 
completing.”
   Messages published to the input topic are not seen in the Go ParDo or 
written to the output topic.
   
   
   Environment
   
   Component      -              Version / Setup
   
   Beam Go SDK     -            2.68.0
   Flink Runner       -             1.19 (MiniCluster via 
beam-runners-flink-1.19-job-server-2.68.0.jar)
   Expansion Service     -     beam-sdks-java-io-expansion-service-2.68.0.jar
   Kafka              -                  v4.0.0 / Local (localhost:9092)
   Harness environment   -  Docker (apache/beam_go_sdk:2.68.0, 
apache/beam_java17_sdk:2.68.0)
   OS                  -                  macOS (Docker Desktop)
   
   Runner options used:              --runner=portable 
--endpoint=localhost:8099 --environment_type=DOCKER 
   -- environment_config=apache/beam_go_sdk:2.68.0 
--expansion_addr=localhost:8088
   
   What works
        •       Go-only pipelines (beam.Create → ParDo(log)) complete 
successfully.
        •       Kafka producer and consumer work fine outside Beam.
        •       Two SDK harnesses start as expected (Go + Java).
        •       Java harness logs Kafka producer initialization.
   
   What fails
        •       KafkaIO.Read emits no elements to the Go ParDo.
        •       No messages appear on the output topic (when KafkaIO.Write is 
enabled).
        •       Go DoFn logs (log.Infof) never appear.
        •       The job remains active with the “bundle ongoing” warnings shown 
above.
   
   My Pipeline code:
   ```
   package main
   
   import (
       "context"
       "flag"
       "fmt"
       "time"
   
       "github.com/apache/beam/sdks/v2/go/pkg/beam"
       "github.com/apache/beam/sdks/v2/go/pkg/beam/io/xlang/kafkaio"
       "github.com/apache/beam/sdks/v2/go/pkg/beam/log"
       "github.com/apache/beam/sdks/v2/go/pkg/beam/x/beamx"
   )
   
   const (
       kafkaBroker = "localhost:9092"
       inputTopic  = "input"
       outputTopic = "output"
   )
   
   var expansionAddr = flag.String("expansion_addr", "", "Address of Kafka 
expansion service")
   
   func logRecord(ctx context.Context, k, v []byte) ([]byte, []byte) {
       log.Infof(ctx, "got record key=%q val=%q", string(k), string(v))
       return k, v
   }
   
   func main() {
       flag.Parse()
       beam.Init()
       ctx := context.Background()
       p, s := beam.NewPipelineWithRoot()
   
       input := kafkaio.Read(
           s,
           *expansionAddr,
           kafkaBroker,
           []string{inputTopic}
       )
   
       beam.ParDo(s, logRecord, input)
       kafkaio.Write(s, *expansionAddr, kafkaBroker, outputTopic, input)
   
       if err := beamx.Run(ctx, p); err != nil {
           log.Exitf(ctx, "Pipeline failed: %v", err)
       }
   }
   ```
   
   
   
   Reproduction Steps
        1.      Start Kafka locally and create input and output topics.
        2.      Run the Kafka expansion service: ```java -jar 
beam-sdks-java-io-expansion-service-2.68.0.jar 8088 
--javaClassLookupAllowlistFile='*'```
        3.     Start the Flink job server:```java -jar 
beam-runners-flink-1.19-job-server-2.68.0.jar --job-port=8099 
--artifact-port=8098```
        4.     Run the Go pipeline: ```go run . --runner=portable 
--endpoint=localhost:8099 --expansion_addr=localhost:8088 
--environment_type=DOCKER --environment_config=apache/beam_go_sdk:2.68.0```
        5.     Produce a few messages to kafka input topic
        6.     Observe no messages logged and no output produced.
        
   
   Logs
   
   Job server (a part of log):
   ```
   Oct 16, 2025 8:23:47 PM 
org.apache.beam.runners.fnexecution.logging.Slf4jLogWriter log
   INFO: initializing Kafka metrics collector
   Oct 16, 2025 8:23:47 PM 
org.apache.beam.runners.fnexecution.logging.Slf4jLogWriter log
   INFO: Kafka version: 3.9.0
   Oct 16, 2025 8:23:47 PM 
org.apache.beam.runners.fnexecution.logging.Slf4jLogWriter log
   INFO: Kafka commitId: 84caaa6e9da06435
   Oct 16, 2025 8:23:47 PM 
org.apache.beam.runners.fnexecution.logging.Slf4jLogWriter log
   INFO: Kafka startTimeMs: 1760626427960
   Oct 16, 2025 8:23:47 PM 
org.apache.beam.runners.fnexecution.logging.Slf4jLogWriter log
   INFO: [Consumer clientId=consumer-Reader-0_offset_consumer_904017666_none-3, 
groupId=Reader-0_offset_consumer_904017666_none] Cluster ID: 
5L6g3nShT-eMCtK--X86sw
   Oct 16, 2025 8:23:49 PM 
org.apache.beam.runners.fnexecution.control.FnApiControlClientPoolService 
getProcessBundleDescriptor
   INFO: getProcessBundleDescriptor request with id 1-3
   Oct 16, 2025 8:23:49 PM 
org.apache.beam.runners.fnexecution.logging.Slf4jLogWriter log
   WARNING: Failed to parse element_processing_timeout: time: invalid duration 
"", there will be no timeout for processing an element in a PTransform operation
   Oct 16, 2025 8:23:49 PM 
org.apache.beam.runners.fnexecution.data.GrpcDataService data
   INFO: Beam Fn Data client connected.
   ...
   WARNING: Operation ongoing in bundle 1 for at least 05m00s without 
outputting or completing
   ```
   
   
   
   Go harness:
   ```
   2025-10-16 20:23:38 2025/10/16 14:53:38 Provision info:
   2025-10-16 20:23:38 pipeline_options:{fields:{key:"beam:option:app_name:v1" 
value:{string_value:"go-job-1-1760626399474061000"}} 
fields:{key:"beam:option:experiments:v1" 
value:{list_value:{values:{string_value:"beam_fn_api"}}}} 
fields:{key:"beam:option:flink_conf_dir:v1" value:{null_value:NULL_VALUE}} 
fields:{key:"beam:option:flink_master:v1" value:{string_value:"[auto]"}} 
fields:{key:"beam:option:go_options:v1" 
value:{struct_value:{fields:{key:"options" 
value:{struct_value:{fields:{key:"endpoint" 
value:{string_value:"localhost:8099"}} fields:{key:"environment_config" 
value:{string_value:"apache/beam_go_sdk:2.68.0"}} 
fields:{key:"environment_type" value:{string_value:"DOCKER"}} 
fields:{key:"expansion_addr" value:{string_value:"localhost:8088"}} 
fields:{key:"hookOrder" value:{string_value:"[\"default_remote_logging\"]"}} 
fields:{key:"hooks" value:{string_value:"{\"default_remote_logging\":null}"}} 
fields:{key:"runner" value:{string_value:"portable"}}}}}}}} 
fields:{key:"beam:option:
 job_name:v1" 
value:{string_value:"go0job0101760626399474061000-kishan-1016145325-78a374a3"}} 
fields:{key:"beam:option:options_id:v1" value:{number_value:2}} 
fields:{key:"beam:option:output_executable_path:v1" 
value:{null_value:NULL_VALUE}} fields:{key:"beam:option:parallelism:v1" 
value:{number_value:-1}} fields:{key:"beam:option:retain_docker_containers:v1" 
value:{bool_value:false}} fields:{key:"beam:option:runner:v1" 
value:{null_value:NULL_VALUE}}} 
retrieval_token:"go-job-1-1760626399474061000_5cf318dd-51e6-493b-93bb-3cc23d02842b"
 logging_endpoint:{url:"host.docker.internal:61976"} 
artifact_endpoint:{url:"host.docker.internal:61978"} 
control_endpoint:{url:"host.docker.internal:61974"} 
dependencies:{type_urn:"beam:artifact:type:file:v1" 
type_payload:"\n\xdc\x01/var/folders/qx/ttwsvpc52bj63wkm_21nhxx40000gn/T/beam-artifact-staging/7085c9eac9bb0341b8b0032b2bb08a63bdd259069eaa3e4a3823483c3c1c5a2b/1-0:go-/var/folders/qx/ttwsvpc52bj63wkm_21nhxx40000gn/T/worker-1-1760626399474073000"
 role
 _urn:"beam:artifact:role:go_worker_binary:v1"} 
runner_capabilities:"beam:protocol:control_response_elements_embedding:v1"
   2025-10-16 20:23:40 2025/10/16 14:53:40 Downloaded: 
/tmp/staged/1-worker-1-1760626399474073000 (sha256: 
4e8c52729f472480c02833507b225ea334663bea8426cd623e2afc6993ed630f, size: 
67584651)
   2025-10-16 20:23:43 Error Setting Rlimit  operation not permitted
   ```
   
   
   Java Harness:
   ```
   2025-10-16 20:23:38 2025/10/16 14:53:38 Provision info:
   2025-10-16 20:23:38 pipeline_options:{fields:{key:"beam:option:app_name:v1"  
value:{string_value:"go-job-1-1760626399474061000"}}  
fields:{key:"beam:option:experiments:v1"  
value:{list_value:{values:{string_value:"beam_fn_api"}}}}  
fields:{key:"beam:option:flink_conf_dir:v1"  value:{null_value:NULL_VALUE}}  
fields:{key:"beam:option:flink_master:v1"  value:{string_value:"[auto]"}}  
fields:{key:"beam:option:go_options:v1"  
value:{struct_value:{fields:{key:"options"  
value:{struct_value:{fields:{key:"endpoint"  
value:{string_value:"localhost:8099"}}  fields:{key:"environment_config"  
value:{string_value:"apache/beam_go_sdk:2.68.0"}}  
fields:{key:"environment_type"  value:{string_value:"DOCKER"}}  
fields:{key:"expansion_addr"  value:{string_value:"localhost:8088"}}  
fields:{key:"hookOrder"  value:{string_value:"[\"default_remote_logging\"]"}}  
fields:{key:"hooks"  value:{string_value:"{\"default_remote_logging\":null}"}}  
fields:{key:"runner"  value:{string_value:"portable"}}}}}}}}  f
 ields:{key:"beam:option:job_name:v1"  
value:{string_value:"go0job0101760626399474061000-kishan-1016145325-78a374a3"}} 
 fields:{key:"beam:option:options_id:v1"  value:{number_value:2}}  
fields:{key:"beam:option:output_executable_path:v1"  
value:{null_value:NULL_VALUE}}  fields:{key:"beam:option:parallelism:v1"  
value:{number_value:-1}}  fields:{key:"beam:option:retain_docker_containers:v1" 
 value:{bool_value:false}}  fields:{key:"beam:option:runner:v1"  
value:{null_value:NULL_VALUE}}}  
retrieval_token:"go-job-1-1760626399474061000_5cf318dd-51e6-493b-93bb-3cc23d02842b"
  logging_endpoint:{url:"host.docker.internal:61977"}  
artifact_endpoint:{url:"host.docker.internal:61979"}  
control_endpoint:{url:"host.docker.internal:61975"}  
dependencies:{type_urn:"beam:artifact:type:file:v1"  
type_payload:"\n\xec\x01/var/folders/qx/ttwsvpc52bj63wkm_21nhxx40000gn/T/beam-artifact-staging/7085c9eac9bb0341b8b0032b2bb08a63bdd259069eaa3e4a3823483c3c1c5a2b/2-0:pAOCMJEhVBbeam:env:dock-beam-sdks-java-io-exp
 ansion-service-2.68.0-DlPxq5ePGVaO4Mfpmkn3NROOlbNjo"  
role_urn:"beam:artifact:role:staging_to:v1"  
role_payload:"\nZbeam-sdks-java-io-expansion-service-2.68.0-DlPxq5ePGVaO4Mfpmkn3NROOlbNjor7qSKlAF02OBD8.jar"}
  runner_capabilities:"beam:protocol:control_response_elements_embedding:v1"
   2025-10-16 20:23:44 2025/10/16 14:53:44 Downloaded: 
/tmp/1-1/staged/beam-sdks-java-io-expansion-service-2.68.0-DlPxq5ePGVaO4Mfpmkn3NROOlbNjor7qSKlAF02OBD8.jar
 (sha256: 0e53f1ab978f19568ee0c7e99a49f735138e95b363a2beea48a940174d8e043f, 
size: 387766088)
   2025-10-16 20:23:45 Running JvmInitializer#onStartup for 
org.apache.beam.sdk.io.kafka.KafkaIOInitializer@7a92922
   2025-10-16 20:23:45 Completed JvmInitializer#onStartup for 
org.apache.beam.sdk.io.kafka.KafkaIOInitializer@7a92922
   2025-10-16 20:23:45 SDK Fn Harness started
   2025-10-16 20:23:45 Harness ID 1-1
   2025-10-16 20:23:45 Logging location url:"host.docker.internal:61977"
   2025-10-16 20:23:45 Control location url:"host.docker.internal:61975"
   2025-10-16 20:23:45 Status location null
   2025-10-16 20:23:45 Pipeline Options File pipeline_options.json
   2025-10-16 20:23:45 Pipeline Options File pipeline_options.json exists. 
Overriding existing options.
   2025-10-16 20:23:45 Pipeline options 
{"beam:option:app_name:v1":"go-job-1-1760626399474061000", 
"beam:option:experiments:v1":["beam_fn_api"], 
"beam:option:flink_conf_dir:v1":null, "beam:option:flink_master:v1":"[auto]", 
"beam:option:go_options:v1":{"options":{"endpoint":"localhost:8099", 
"environment_config":"apache/beam_go_sdk:2.68.0", "environment_type":"DOCKER", 
"expansion_addr":"localhost:8088", "hookOrder":"[\"default_remote_logging\"]", 
"hooks":"{\"default_remote_logging\":null}", "runner":"portable"}}, 
"beam:option:job_name:v1":"go0job0101760626399474061000-kishan-1016145325-78a374a3",
 "beam:option:options_id:v1":2, "beam:option:output_executable_path:v1":null, 
"beam:option:parallelism:v1":-1, 
"beam:option:retain_docker_containers:v1":false, "beam:option:runner:v1":null}
   ```
   (but job continues normally)
   
   Is KafkaIO fully supported for Go SDK cross-language pipelines in Beam 
2.68.0?
   
   
   
   ### Issue Priority
   
   Priority: 2 (default / most bugs should be filed as P2)
   
   ### Issue Components
   
   - [ ] Component: Python SDK
   - [ ] Component: Java SDK
   - [x] Component: Go SDK
   - [ ] Component: Typescript SDK
   - [x] Component: IO connector
   - [ ] Component: Beam YAML
   - [ ] Component: Beam examples
   - [ ] Component: Beam playground
   - [ ] Component: Beam katas
   - [ ] Component: Website
   - [ ] Component: Infrastructure
   - [ ] Component: Spark Runner
   - [x] Component: Flink Runner
   - [ ] Component: Samza Runner
   - [ ] Component: Twister2 Runner
   - [ ] Component: Hazelcast Jet Runner
   - [ ] Component: Google Cloud Dataflow Runner


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to