charlespnh opened a new issue, #35099:
URL: https://github.com/apache/beam/issues/35099

   ### What happened?
   
   I'm creating a very simple YAML pipeline to write to Kafka (`apache-beam 
2.65.0`):
   ```
   python -m apache_beam.yaml.main 
--yaml_pipeline_file=sdks/python/apache_beam/yaml/examples/transforms/io/kafka_write.yaml
   ```
   
   ```
   pipeline:
     type: chain
     transforms:
       - type: ReadFromText
         name: Read from GCS
         config:
           path: gs://dataflow-samples/shakespeare/kinglear.txt
   
       - type: MapToFields
         name: Build Kafka records
         config:
           language: python
           fields:
             key: b''
             value:
               callable: |
                 def func(row):
                   return row.line.encode('utf-8')
   
       - type: WriteToKafka
         name: Write to Kafka
         config:
           format: RAW
           topic: my‑topic
           bootstrap_servers: kafka:9092
   ```
   
   Since the Kafka write transform expects KVs and the format specified is RAW, 
that's where MapToFields transform comes in to do some additional 
transformation to be able to write to Kafka. But I got this error (towards the 
end):
   
   ```
   (env) root@2bf12866314d:/home/beam# python -m apache_beam.yaml.main 
--yaml_pipeline_file=sdks/python/apache_beam/yaml/examples/transforms/io/kafka_write.yaml
   INFO:root:Missing pipeline option (runner). Executing pipeline using the 
default runner: DirectRunner.
   Building pipeline...
   INFO:apache_beam.yaml.yaml_transform:Expanding "Read from GCS" at line 5
   INFO:apache_beam.yaml.yaml_transform:Expanding "Build Kafka records" at line 
10
   INFO:apache_beam.yaml.yaml_transform:Expanding "Write to Kafka" at line 21
   INFO:apache_beam.utils.subprocess_server:Using cached job server jar from 
https://repo.maven.apache.org/maven2/org/apache/beam/beam-sdks-java-io-expansion-service/2.65.0/beam-sdks-java-io-expansion-service-2.65.0.jar
   INFO:root:Starting a JAR-based expansion service from JAR 
/root/.apache_beam/cache/jars/beam-sdks-java-io-expansion-service-2.65.0.jar
   INFO:apache_beam.utils.subprocess_server:Starting service with ['java' 
'-jar' 
'/root/.apache_beam/cache/jars/beam-sdks-java-io-expansion-service-2.65.0.jar' 
'60293' 
'--filesToStage=/root/.apache_beam/cache/jars/beam-sdks-java-io-expansion-service-2.65.0.jar'
 '--alsoStartLoopbackWorker']
   INFO:apache_beam.utils.subprocess_server:Starting expansion service at 
localhost:60293
   INFO:apache_beam.utils.subprocess_server:May 30, 2025 4:28:25 PM 
org.apache.beam.sdk.expansion.service.ExpansionService loadRegisteredTransforms
   INFO:apache_beam.utils.subprocess_server:INFO: Registering external 
transforms: [beam:transform:org.apache.beam:kafka_write:v2, 
beam:transform:org.apache.beam:kafka_read_with_metadata:v2, 
beam:transform:org.apache.beam:kafka_write:v1, 
beam:transform:combine_grouped_values:v1, 
beam:schematransform:org.apache.beam:iceberg_read:v1, 
beam:transform:combine_globally:v1, beam:external:java:generate_sequence:v1, 
beam:transform:redistribute_by_key:v1, beam:transform:window_into:v1, 
beam:schematransform:org.apache.beam:kafka_read:v1, 
beam:schematransform:org.apache.beam:kafka_write:v1, 
beam:schematransform:org.apache.beam:iceberg_cdc_read:v1, 
beam:transform:combine_per_key:v1, 
beam:transform:org.apache.beam:kafka_read_with_metadata:v1, 
beam:transform:group_by_key:v1, beam:transform:group_into_batches:v1, 
beam:transform:group_into_batches_with_sharded_key:v1, 
beam:transform:create_view:v1, beam:transform:teststream:v1, 
beam:transform:sdf_process_keyed_elements:v1, beam:schematransform:org.ap
 ache.beam:iceberg_write:v1, beam:transform:flatten:v1, 
beam:schematransform:org.apache.beam:tfrecord_read:v1, 
beam:transform:impulse:v1, beam:transform:write_files:v1, 
beam:runners_core:transforms:splittable_process:v1, 
beam:schematransform:org.apache.beam:tfrecord_write:v1, 
beam:transform:org.apache.beam:kafka_read_without_metadata:v1, 
beam:transform:managed:v1, beam:transform:reshuffle:v1, 
beam:transform:redistribute_arbitrarily:v1]
   INFO:apache_beam.utils.subprocess_server:
   INFO:apache_beam.utils.subprocess_server:Registered transforms:
   INFO:apache_beam.utils.subprocess_server:       
beam:transform:org.apache.beam:kafka_write:v2: 
org.apache.beam.sdk.expansion.service.ExpansionService$TransformProviderForPayloadTranslator@1755e85b
   INFO:apache_beam.utils.subprocess_server:       
beam:transform:org.apache.beam:kafka_read_with_metadata:v2: 
org.apache.beam.sdk.expansion.service.ExpansionService$TransformProviderForPayloadTranslator@736d6a5c
   INFO:apache_beam.utils.subprocess_server:       
beam:transform:org.apache.beam:kafka_write:v1: 
org.apache.beam.sdk.expansion.service.ExpansionService$TransformProviderForBuilder@2371aaca
   INFO:apache_beam.utils.subprocess_server:       
beam:transform:combine_grouped_values:v1: 
org.apache.beam.sdk.expansion.service.ExpansionService$TransformProviderForPayloadTranslator@5b529706
   INFO:apache_beam.utils.subprocess_server:       
beam:schematransform:org.apache.beam:iceberg_read:v1: 
org.apache.beam.sdk.expansion.service.ExpansionService$TransformProviderForPayloadTranslator@63fdab07
   INFO:apache_beam.utils.subprocess_server:       
beam:transform:combine_globally:v1: 
org.apache.beam.sdk.expansion.service.ExpansionService$TransformProviderForPayloadTranslator@7b5a12ae
   INFO:apache_beam.utils.subprocess_server:       
beam:external:java:generate_sequence:v1: 
org.apache.beam.sdk.expansion.service.ExpansionService$TransformProviderForBuilder@5553d0f5
   INFO:apache_beam.utils.subprocess_server:       
beam:transform:redistribute_by_key:v1: 
org.apache.beam.sdk.expansion.service.ExpansionService$TransformProviderForPayloadTranslator@1af687fe
   INFO:apache_beam.utils.subprocess_server:       
beam:transform:window_into:v1: 
org.apache.beam.sdk.expansion.service.ExpansionService$TransformProviderForPayloadTranslator@14dda234
   INFO:apache_beam.utils.subprocess_server:       
beam:schematransform:org.apache.beam:kafka_read:v1: 
org.apache.beam.sdk.expansion.service.ExpansionService$TransformProviderForPayloadTranslator@3f390d63
   INFO:apache_beam.utils.subprocess_server:       
beam:schematransform:org.apache.beam:kafka_write:v1: 
org.apache.beam.sdk.expansion.service.ExpansionService$TransformProviderForPayloadTranslator@74a6a609
   INFO:apache_beam.utils.subprocess_server:       
beam:schematransform:org.apache.beam:iceberg_cdc_read:v1: 
org.apache.beam.sdk.expansion.service.ExpansionService$TransformProviderForPayloadTranslator@5a411614
   INFO:apache_beam.utils.subprocess_server:       
beam:transform:combine_per_key:v1: 
org.apache.beam.sdk.expansion.service.ExpansionService$TransformProviderForPayloadTranslator@2374d36a
   INFO:apache_beam.utils.subprocess_server:       
beam:transform:org.apache.beam:kafka_read_with_metadata:v1: 
org.apache.beam.sdk.expansion.service.ExpansionService$TransformProviderForBuilder@54d18072
   INFO:apache_beam.utils.subprocess_server:       
beam:transform:group_by_key:v1: 
org.apache.beam.sdk.expansion.service.ExpansionService$TransformProviderForPayloadTranslator@1506f20f
   INFO:apache_beam.utils.subprocess_server:       
beam:transform:group_into_batches:v1: 
org.apache.beam.sdk.expansion.service.ExpansionService$TransformProviderForPayloadTranslator@47a5b70d
   INFO:apache_beam.utils.subprocess_server:       
beam:transform:group_into_batches_with_sharded_key:v1: 
org.apache.beam.sdk.expansion.service.ExpansionService$TransformProviderForPayloadTranslator@424fd310
   INFO:apache_beam.utils.subprocess_server:       
beam:transform:create_view:v1: 
org.apache.beam.sdk.expansion.service.ExpansionService$TransformProviderForPayloadTranslator@1a45193b
   INFO:apache_beam.utils.subprocess_server:       
beam:transform:teststream:v1: 
org.apache.beam.sdk.expansion.service.ExpansionService$TransformProviderForPayloadTranslator@38f116f6
   INFO:apache_beam.utils.subprocess_server:       
beam:transform:sdf_process_keyed_elements:v1: 
org.apache.beam.sdk.expansion.service.ExpansionService$TransformProviderForPayloadTranslator@5286c33a
   INFO:apache_beam.utils.subprocess_server:       
beam:schematransform:org.apache.beam:iceberg_write:v1: 
org.apache.beam.sdk.expansion.service.ExpansionService$TransformProviderForPayloadTranslator@6e6d5d29
   INFO:apache_beam.utils.subprocess_server:       beam:transform:flatten:v1: 
org.apache.beam.sdk.expansion.service.ExpansionService$TransformProviderForPayloadTranslator@5c530d1e
   INFO:apache_beam.utils.subprocess_server:       
beam:schematransform:org.apache.beam:tfrecord_read:v1: 
org.apache.beam.sdk.expansion.service.ExpansionService$TransformProviderForPayloadTranslator@6c25e6c4
   INFO:apache_beam.utils.subprocess_server:       beam:transform:impulse:v1: 
org.apache.beam.sdk.expansion.service.ExpansionService$TransformProviderForPayloadTranslator@85e6769
   INFO:apache_beam.utils.subprocess_server:       
beam:transform:write_files:v1: 
org.apache.beam.sdk.expansion.service.ExpansionService$TransformProviderForPayloadTranslator@c5ee75e
   INFO:apache_beam.utils.subprocess_server:       
beam:runners_core:transforms:splittable_process:v1: 
org.apache.beam.sdk.expansion.service.ExpansionService$TransformProviderForPayloadTranslator@48a12036
   INFO:apache_beam.utils.subprocess_server:       
beam:schematransform:org.apache.beam:tfrecord_write:v1: 
org.apache.beam.sdk.expansion.service.ExpansionService$TransformProviderForPayloadTranslator@bf1ec20
   INFO:apache_beam.utils.subprocess_server:       
beam:transform:org.apache.beam:kafka_read_without_metadata:v1: 
org.apache.beam.sdk.expansion.service.ExpansionService$TransformProviderForBuilder@70efb718
   INFO:apache_beam.utils.subprocess_server:       beam:transform:managed:v1: 
org.apache.beam.sdk.expansion.service.ExpansionService$TransformProviderForPayloadTranslator@b70da4c
   INFO:apache_beam.utils.subprocess_server:       beam:transform:reshuffle:v1: 
org.apache.beam.sdk.expansion.service.ExpansionService$TransformProviderForPayloadTranslator@4a11eb84
   INFO:apache_beam.utils.subprocess_server:       
beam:transform:redistribute_arbitrarily:v1: 
org.apache.beam.sdk.expansion.service.ExpansionService$TransformProviderForPayloadTranslator@4e858e0a
   INFO:apache_beam.utils.subprocess_server:
   INFO:apache_beam.utils.subprocess_server:Registered SchemaTransformProviders:
   INFO:apache_beam.utils.subprocess_server:       
beam:schematransform:org.apache.beam:yaml:filter-java:v1
   INFO:apache_beam.utils.subprocess_server:       
beam:schematransform:org.apache.beam:yaml:flatten:v1
   INFO:apache_beam.utils.subprocess_server:       
beam:schematransform:org.apache.beam:iceberg_read:v1
   INFO:apache_beam.utils.subprocess_server:       
beam:schematransform:org.apache.beam:yaml:map_to_fields-java:v1
   INFO:apache_beam.utils.subprocess_server:       
beam:schematransform:org.apache.beam:yaml:log_for_testing:v1
   INFO:apache_beam.utils.subprocess_server:       
beam:schematransform:org.apache.beam:iceberg_write:v1
   INFO:apache_beam.utils.subprocess_server:       beam:test_schematransform:v1
   INFO:apache_beam.utils.subprocess_server:       
beam:schematransform:org.apache.beam:kafka_read:v1
   INFO:apache_beam.utils.subprocess_server:       
beam:schematransform:org.apache.beam:kafka_write:v1
   INFO:apache_beam.utils.subprocess_server:       
beam:schematransform:org.apache.beam:tfrecord_read:v1
   INFO:apache_beam.utils.subprocess_server:       
beam:schematransform:org.apache.beam:tfrecord_write:v1
   INFO:apache_beam.utils.subprocess_server:       
beam:schematransform:org.apache.beam:yaml:window_into_strategy:v1
   INFO:apache_beam.utils.subprocess_server:       
beam:schematransform:org.apache.beam:iceberg_cdc_read:v1
   INFO:apache_beam.utils.subprocess_server:       beam:transform:managed:v1
   INFO:apache_beam.utils.subprocess_server:       
beam:schematransform:org.apache.beam:generate_sequence:v1
   INFO:apache_beam.utils.subprocess_server:       
beam:schematransform:org.apache.beam:yaml:explode:v1
   WARNING:root:Waiting for grpc channel to be ready at localhost:60293.
   INFO:apache_beam.utils.subprocess_server:Using cached job server jar from 
https://repo.maven.apache.org/maven2/org/apache/beam/beam-sdks-java-io-expansion-service/2.65.0/beam-sdks-java-io-expansion-service-2.65.0.jar
   INFO:root:Starting a JAR-based expansion service from JAR 
/root/.apache_beam/cache/jars/beam-sdks-java-io-expansion-service-2.65.0.jar
   INFO:root:Starting a JAR-based expansion service from JAR 
/root/.apache_beam/cache/jars/beam-sdks-java-io-expansion-service-2.65.0.jar
   INFO:apache_beam.utils.subprocess_server:May 30, 2025 4:28:28 PM 
org.apache.beam.sdk.expansion.service.ExpansionService expand
   INFO:apache_beam.utils.subprocess_server:INFO: Expanding 'Write to 
Kafka/beam:schematransform:org.apache.beam:kafka_write:v1' with URN 
'beam:expansion:payload:schematransform:v1'
   Traceback (most recent call last):
     File 
"/home/beam/scripts/kafka-iceberg-spark/local/env/lib/python3.10/site-packages/apache_beam/yaml/yaml_transform.py",
 line 507, in expand_leaf_transform
       outputs = inputs | scope.unique_name(spec, ptransform) >> ptransform
     File 
"/home/beam/scripts/kafka-iceberg-spark/local/env/lib/python3.10/site-packages/apache_beam/pvalue.py",
 line 138, in __or__
       return self.pipeline.apply(ptransform, self)
     File 
"/home/beam/scripts/kafka-iceberg-spark/local/env/lib/python3.10/site-packages/apache_beam/pipeline.py",
 line 723, in apply
       return self.apply(
     File 
"/home/beam/scripts/kafka-iceberg-spark/local/env/lib/python3.10/site-packages/apache_beam/pipeline.py",
 line 734, in apply
       return self.apply(transform, pvalueish)
     File 
"/home/beam/scripts/kafka-iceberg-spark/local/env/lib/python3.10/site-packages/apache_beam/pipeline.py",
 line 801, in apply
       pvalueish_result = self.runner.apply(transform, pvalueish, self._options)
     File 
"/home/beam/scripts/kafka-iceberg-spark/local/env/lib/python3.10/site-packages/apache_beam/runners/runner.py",
 line 191, in apply
       return self.apply_PTransform(transform, input, options)
     File 
"/home/beam/scripts/kafka-iceberg-spark/local/env/lib/python3.10/site-packages/apache_beam/runners/runner.py",
 line 195, in apply_PTransform
       return transform.expand(input)
     File 
"/home/beam/scripts/kafka-iceberg-spark/local/env/lib/python3.10/site-packages/apache_beam/yaml/yaml_transform.py",
 line 424, in recording_expand
       result = original_expand(pvalue)
     File 
"/home/beam/scripts/kafka-iceberg-spark/local/env/lib/python3.10/site-packages/apache_beam/transforms/external.py",
 line 504, in expand
       return pcolls | self._payload_builder.identifier() >> ExternalTransform(
     File 
"/home/beam/scripts/kafka-iceberg-spark/local/env/lib/python3.10/site-packages/apache_beam/pvalue.py",
 line 138, in __or__
       return self.pipeline.apply(ptransform, self)
     File 
"/home/beam/scripts/kafka-iceberg-spark/local/env/lib/python3.10/site-packages/apache_beam/pipeline.py",
 line 723, in apply
       return self.apply(
     File 
"/home/beam/scripts/kafka-iceberg-spark/local/env/lib/python3.10/site-packages/apache_beam/pipeline.py",
 line 734, in apply
       return self.apply(transform, pvalueish)
     File 
"/home/beam/scripts/kafka-iceberg-spark/local/env/lib/python3.10/site-packages/apache_beam/pipeline.py",
 line 801, in apply
       pvalueish_result = self.runner.apply(transform, pvalueish, self._options)
     File 
"/home/beam/scripts/kafka-iceberg-spark/local/env/lib/python3.10/site-packages/apache_beam/runners/runner.py",
 line 191, in apply
       return self.apply_PTransform(transform, input, options)
     File 
"/home/beam/scripts/kafka-iceberg-spark/local/env/lib/python3.10/site-packages/apache_beam/runners/runner.py",
 line 195, in apply_PTransform
       return transform.expand(input)
     File 
"/home/beam/scripts/kafka-iceberg-spark/local/env/lib/python3.10/site-packages/apache_beam/transforms/external.py",
 line 827, in expand
       raise RuntimeError(_sanitize_java_traceback(response.error))
   RuntimeError: 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.UncheckedExecutionException:
 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.UncheckedExecutionException:
 java.lang.IllegalArgumentException: Failed to decode Schema due to an error 
decoding Field proto:
   
   name: "value"
   type {
     nullable: true
     logical_type {
       urn: "beam:logical:pythonsdk_any:v1"
     }
   }
   
           at 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2086)
           at 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LocalCache.get(LocalCache.java:4012)
           at 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LocalCache.getOrLoad(LocalCache.java:4035)
           at 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:5013)
           at 
org.apache.beam.sdk.util.construction.RehydratedComponents.getPCollection(RehydratedComponents.java:140)
           at 
org.apache.beam.sdk.expansion.service.ExpansionService.lambda$expand$1(ExpansionService.java:620)
           at 
java.base/java.util.stream.Collectors.lambda$uniqKeysMapAccumulator$1(Collectors.java:180)
           at 
java.base/java.util.stream.ReduceOps$3ReducingSink.accept(ReduceOps.java:169)
           at 
java.base/java.util.Collections$UnmodifiableMap$UnmodifiableEntrySet.lambda$entryConsumer$0(Collections.java:1625)
           at java.base/java.util.Iterator.forEachRemaining(Iterator.java:133)
           at 
java.base/java.util.Spliterators$IteratorSpliterator.forEachRemaining(Spliterators.java:1845)
           at 
java.base/java.util.Collections$UnmodifiableMap$UnmodifiableEntrySet$UnmodifiableEntrySetSpliterator.forEachRemaining(Collections.java:1650)
           at 
java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:509)
           at 
java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:499)
           at 
java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:921)
           at 
java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
           at 
java.base/java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:682)
           at 
org.apache.beam.sdk.expansion.service.ExpansionService.expand(ExpansionService.java:615)
           at 
org.apache.beam.sdk.expansion.service.ExpansionService.expand(ExpansionService.java:758)
           at 
org.apache.beam.model.expansion.v1.ExpansionServiceGrpc$MethodHandlers.invoke(ExpansionServiceGrpc.java:306)
           at 
org.apache.beam.vendor.grpc.v1p69p0.io.grpc.stub.ServerCalls$UnaryServerCallHandler$UnaryServerCallListener.onHalfClose(ServerCalls.java:182)
           at 
org.apache.beam.vendor.grpc.v1p69p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.halfClosed(ServerCallImpl.java:356)
           at 
org.apache.beam.vendor.grpc.v1p69p0.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1HalfClosed.runInContext(ServerImpl.java:861)
           at 
org.apache.beam.vendor.grpc.v1p69p0.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
           at 
org.apache.beam.vendor.grpc.v1p69p0.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:133)
           at 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
           at 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
           at java.base/java.lang.Thread.run(Thread.java:840)
   Caused by: 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.UncheckedExecutionException:
 java.lang.IllegalArgumentException: Failed to decode Schema due to an error 
decoding Field proto:
   
   name: "value"
   type {
     nullable: true
     logical_type {
       urn: "beam:logical:pythonsdk_any:v1"
     }
   }
   
           at 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2086)
           at 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LocalCache.get(LocalCache.java:4012)
           at 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LocalCache.getOrLoad(LocalCache.java:4035)
           at 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:5013)
           at 
org.apache.beam.sdk.util.construction.RehydratedComponents.getCoder(RehydratedComponents.java:169)
           at 
org.apache.beam.sdk.util.construction.PCollectionTranslation.fromProto(PCollectionTranslation.java:51)
           at 
org.apache.beam.sdk.util.construction.RehydratedComponents$3.load(RehydratedComponents.java:109)
           at 
org.apache.beam.sdk.util.construction.RehydratedComponents$3.load(RehydratedComponents.java:99)
           at 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3571)
           at 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2313)
           at 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2190)
           at 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2080)
           ... 27 more
   Caused by: java.lang.IllegalArgumentException: Failed to decode Schema due 
to an error decoding Field proto:
   
   name: "value"
   type {
     nullable: true
     logical_type {
       urn: "beam:logical:pythonsdk_any:v1"
     }
   }
   
           at 
org.apache.beam.sdk.schemas.SchemaTranslation.schemaFromProto(SchemaTranslation.java:305)
           at 
org.apache.beam.sdk.util.construction.CoderTranslators$8.fromComponents(CoderTranslators.java:170)
           at 
org.apache.beam.sdk.util.construction.CoderTranslators$8.fromComponents(CoderTranslators.java:152)
           at 
org.apache.beam.sdk.util.construction.CoderTranslation.fromKnownCoder(CoderTranslation.java:192)
           at 
org.apache.beam.sdk.util.construction.CoderTranslation.fromProto(CoderTranslation.java:170)
           at 
org.apache.beam.sdk.util.construction.RehydratedComponents$2.load(RehydratedComponents.java:86)
           at 
org.apache.beam.sdk.util.construction.RehydratedComponents$2.load(RehydratedComponents.java:81)
           at 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3571)
           at 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2313)
           at 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2190)
           at 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2080)
           ... 38 more
   Caused by: java.lang.IllegalArgumentException: Unexpected type_info: 
TYPEINFO_NOT_SET
           at 
org.apache.beam.sdk.schemas.SchemaTranslation.fieldTypeFromProtoWithoutNullable(SchemaTranslation.java:503)
           at 
org.apache.beam.sdk.schemas.SchemaTranslation.fieldTypeFromProto(SchemaTranslation.java:340)
           at 
org.apache.beam.sdk.schemas.SchemaTranslation.fieldTypeFromProtoWithoutNullable(SchemaTranslation.java:500)
           at 
org.apache.beam.sdk.schemas.SchemaTranslation.fieldTypeFromProto(SchemaTranslation.java:340)
           at 
org.apache.beam.sdk.schemas.SchemaTranslation.fieldFromProto(SchemaTranslation.java:333)
           at 
org.apache.beam.sdk.schemas.SchemaTranslation.schemaFromProto(SchemaTranslation.java:303)
           ... 48 more
   
   
   
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.UncheckedExecutionException:
 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.UncheckedExecutionException:
 java.lang.IllegalArgumentException: Failed to decode Schema due to an error 
decoding Field proto:
   
   name: "value"
   type {
     nullable: true
     logical_type {
       urn: "beam:logical:pythonsdk_any:v1"
     }
   }
   
   
   name: "value"
   type {
     nullable: true
     logical_type {
       urn: "beam:logical:pythonsdk_any:v1"
     }
   }
   
   
   name: "value"
   type {
     nullable: true
     logical_type {
       urn: "beam:logical:pythonsdk_any:v1"
     }
   }
   
   The above exception was the direct cause of the following exception:
   
   Traceback (most recent call last):
     File "/usr/local/lib/python3.10/runpy.py", line 196, in _run_module_as_main
       return _run_code(code, main_globals, None,
     File "/usr/local/lib/python3.10/runpy.py", line 86, in _run_code
       exec(code, run_globals)
     File 
"/home/beam/scripts/kafka-iceberg-spark/local/env/lib/python3.10/site-packages/apache_beam/yaml/main.py",
 line 306, in <module>
       run()
     File 
"/home/beam/scripts/kafka-iceberg-spark/local/env/lib/python3.10/site-packages/apache_beam/yaml/main.py",
 line 156, in run
       constructor(p)
     File 
"/home/beam/scripts/kafka-iceberg-spark/local/env/lib/python3.10/site-packages/apache_beam/yaml/main.py",
 line 290, in constructor
       yaml_transform.expand_pipeline(
     File 
"/home/beam/scripts/kafka-iceberg-spark/local/env/lib/python3.10/site-packages/apache_beam/yaml/yaml_transform.py",
 line 1136, in expand_pipeline
       providers or {})).expand(root)
     File 
"/home/beam/scripts/kafka-iceberg-spark/local/env/lib/python3.10/site-packages/apache_beam/yaml/yaml_transform.py",
 line 1097, in expand
       result = expand_transform(
     File 
"/home/beam/scripts/kafka-iceberg-spark/local/env/lib/python3.10/site-packages/apache_beam/yaml/yaml_transform.py",
 line 479, in expand_transform
       return expand_composite_transform(spec, scope)
     File 
"/home/beam/scripts/kafka-iceberg-spark/local/env/lib/python3.10/site-packages/apache_beam/yaml/yaml_transform.py",
 line 562, in expand_composite_transform
       return transform.expand(None)
     File 
"/home/beam/scripts/kafka-iceberg-spark/local/env/lib/python3.10/site-packages/apache_beam/yaml/yaml_transform.py",
 line 545, in expand
       inner_scope.compute_all()
     File 
"/home/beam/scripts/kafka-iceberg-spark/local/env/lib/python3.10/site-packages/apache_beam/yaml/yaml_transform.py",
 line 205, in compute_all
       self.compute_outputs(transform_id)
     File 
"/home/beam/scripts/kafka-iceberg-spark/local/env/lib/python3.10/site-packages/apache_beam/yaml/yaml_transform.py",
 line 103, in wrapper
       self._cache[key] = func(self, *args)
     File 
"/home/beam/scripts/kafka-iceberg-spark/local/env/lib/python3.10/site-packages/apache_beam/yaml/yaml_transform.py",
 line 241, in compute_outputs
       return expand_transform(self._transforms_by_uuid[transform_id], self)
     File 
"/home/beam/scripts/kafka-iceberg-spark/local/env/lib/python3.10/site-packages/apache_beam/yaml/yaml_transform.py",
 line 481, in expand_transform
       return expand_leaf_transform(spec, scope)
     File 
"/home/beam/scripts/kafka-iceberg-spark/local/env/lib/python3.10/site-packages/apache_beam/yaml/yaml_transform.py",
 line 509, in expand_leaf_transform
       raise ValueError(
   ValueError: Error applying transform "Write to Kafka" at line 21: 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.UncheckedExecutionException:
 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.UncheckedExecutionException:
 java.lang.IllegalArgumentException: Failed to decode Schema due to an error 
decoding Field proto:
   
   name: "value"
   type {
     nullable: true
     logical_type {
       urn: "beam:logical:pythonsdk_any:v1"
     }
   }
   
           at 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2086)
           at 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LocalCache.get(LocalCache.java:4012)
           at 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LocalCache.getOrLoad(LocalCache.java:4035)
           at 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:5013)
           at 
org.apache.beam.sdk.util.construction.RehydratedComponents.getPCollection(RehydratedComponents.java:140)
           at 
org.apache.beam.sdk.expansion.service.ExpansionService.lambda$expand$1(ExpansionService.java:620)
           at 
java.base/java.util.stream.Collectors.lambda$uniqKeysMapAccumulator$1(Collectors.java:180)
           at 
java.base/java.util.stream.ReduceOps$3ReducingSink.accept(ReduceOps.java:169)
           at 
java.base/java.util.Collections$UnmodifiableMap$UnmodifiableEntrySet.lambda$entryConsumer$0(Collections.java:1625)
           at java.base/java.util.Iterator.forEachRemaining(Iterator.java:133)
           at 
java.base/java.util.Spliterators$IteratorSpliterator.forEachRemaining(Spliterators.java:1845)
           at 
java.base/java.util.Collections$UnmodifiableMap$UnmodifiableEntrySet$UnmodifiableEntrySetSpliterator.forEachRemaining(Collections.java:1650)
           at 
java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:509)
           at 
java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:499)
           at 
java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:921)
           at 
java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
           at 
java.base/java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:682)
           at 
org.apache.beam.sdk.expansion.service.ExpansionService.expand(ExpansionService.java:615)
           at 
org.apache.beam.sdk.expansion.service.ExpansionService.expand(ExpansionService.java:758)
           at 
org.apache.beam.model.expansion.v1.ExpansionServiceGrpc$MethodHandlers.invoke(ExpansionServiceGrpc.java:306)
           at 
org.apache.beam.vendor.grpc.v1p69p0.io.grpc.stub.ServerCalls$UnaryServerCallHandler$UnaryServerCallListener.onHalfClose(ServerCalls.java:182)
           at 
org.apache.beam.vendor.grpc.v1p69p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.halfClosed(ServerCallImpl.java:356)
           at 
org.apache.beam.vendor.grpc.v1p69p0.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1HalfClosed.runInContext(ServerImpl.java:861)
           at 
org.apache.beam.vendor.grpc.v1p69p0.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
           at 
org.apache.beam.vendor.grpc.v1p69p0.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:133)
           at 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
           at 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
           at java.base/java.lang.Thread.run(Thread.java:840)
   Caused by: 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.UncheckedExecutionException:
 java.lang.IllegalArgumentException: Failed to decode Schema due to an error 
decoding Field proto:
   
   name: "value"
   type {
     nullable: true
     logical_type {
       urn: "beam:logical:pythonsdk_any:v1"
     }
   }
   
           at 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2086)
           at 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LocalCache.get(LocalCache.java:4012)
           at 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LocalCache.getOrLoad(LocalCache.java:4035)
           at 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:5013)
           at 
org.apache.beam.sdk.util.construction.RehydratedComponents.getCoder(RehydratedComponents.java:169)
           at 
org.apache.beam.sdk.util.construction.PCollectionTranslation.fromProto(PCollectionTranslation.java:51)
           at 
org.apache.beam.sdk.util.construction.RehydratedComponents$3.load(RehydratedComponents.java:109)
           at 
org.apache.beam.sdk.util.construction.RehydratedComponents$3.load(RehydratedComponents.java:99)
           at 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3571)
           at 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2313)
           at 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2190)
           at 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2080)
           ... 27 more
   Caused by: java.lang.IllegalArgumentException: Failed to decode Schema due 
to an error decoding Field proto:
   
   name: "value"
   type {
     nullable: true
     logical_type {
       urn: "beam:logical:pythonsdk_any:v1"
     }
   }
   
           at 
org.apache.beam.sdk.schemas.SchemaTranslation.schemaFromProto(SchemaTranslation.java:305)
           at 
org.apache.beam.sdk.util.construction.CoderTranslators$8.fromComponents(CoderTranslators.java:170)
           at 
org.apache.beam.sdk.util.construction.CoderTranslators$8.fromComponents(CoderTranslators.java:152)
           at 
org.apache.beam.sdk.util.construction.CoderTranslation.fromKnownCoder(CoderTranslation.java:192)
           at 
org.apache.beam.sdk.util.construction.CoderTranslation.fromProto(CoderTranslation.java:170)
           at 
org.apache.beam.sdk.util.construction.RehydratedComponents$2.load(RehydratedComponents.java:86)
           at 
org.apache.beam.sdk.util.construction.RehydratedComponents$2.load(RehydratedComponents.java:81)
           at 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3571)
           at 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2313)
           at 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2190)
           at 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2080)
           ... 38 more
   Caused by: java.lang.IllegalArgumentException: Unexpected type_info: 
TYPEINFO_NOT_SET
           at 
org.apache.beam.sdk.schemas.SchemaTranslation.fieldTypeFromProtoWithoutNullable(SchemaTranslation.java:503)
           at 
org.apache.beam.sdk.schemas.SchemaTranslation.fieldTypeFromProto(SchemaTranslation.java:340)
           at 
org.apache.beam.sdk.schemas.SchemaTranslation.fieldTypeFromProtoWithoutNullable(SchemaTranslation.java:500)
           at 
org.apache.beam.sdk.schemas.SchemaTranslation.fieldTypeFromProto(SchemaTranslation.java:340)
           at 
org.apache.beam.sdk.schemas.SchemaTranslation.fieldFromProto(SchemaTranslation.java:333)
           at 
org.apache.beam.sdk.schemas.SchemaTranslation.schemaFromProto(SchemaTranslation.java:303)
           ... 48 more
   
   
   
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.UncheckedExecutionException:
 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.UncheckedExecutionException:
 java.lang.IllegalArgumentException: Failed to decode Schema due to an error 
decoding Field proto:
   
   name: "value"
   type {
     nullable: true
     logical_type {
       urn: "beam:logical:pythonsdk_any:v1"
     }
   }
   
   
   name: "value"
   type {
     nullable: true
     logical_type {
       urn: "beam:logical:pythonsdk_any:v1"
     }
   }
   
   
   name: "value"
   type {
     nullable: true
     logical_type {
       urn: "beam:logical:pythonsdk_any:v1"
     }
   }
   ```
   
   ### Issue Priority
   
   Priority: 2 (default / most bugs should be filed as P2)
   
   ### Issue Components
   
   - [ ] Component: Python SDK
   - [ ] Component: Java SDK
   - [ ] Component: Go SDK
   - [ ] Component: Typescript SDK
   - [ ] Component: IO connector
   - [x] Component: Beam YAML
   - [ ] Component: Beam examples
   - [ ] Component: Beam playground
   - [ ] Component: Beam katas
   - [ ] Component: Website
   - [ ] Component: Infrastructure
   - [ ] Component: Spark Runner
   - [ ] Component: Flink Runner
   - [ ] Component: Samza Runner
   - [ ] Component: Twister2 Runner
   - [ ] Component: Hazelcast Jet Runner
   - [ ] Component: Google Cloud Dataflow Runner


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to