charlespnh opened a new issue, #35099:
URL: https://github.com/apache/beam/issues/35099
### What happened?
I'm creating a very simple YAML pipeline to write to Kafka (`apache-beam
2.65.0`):
```
python -m apache_beam.yaml.main
--yaml_pipeline_file=sdks/python/apache_beam/yaml/examples/transforms/io/kafka_write.yaml
```
```
pipeline:
type: chain
transforms:
- type: ReadFromText
name: Read from GCS
config:
path: gs://dataflow-samples/shakespeare/kinglear.txt
- type: MapToFields
name: Build Kafka records
config:
language: python
fields:
key: b''
value:
callable: |
def func(row):
return row.line.encode('utf-8')
- type: WriteToKafka
name: Write to Kafka
config:
format: RAW
topic: my‑topic
bootstrap_servers: kafka:9092
```
Since the Kafka write transform expects KVs and the format specified is RAW,
that's where MapToFields transform comes in to do some additional
transformation to be able to write to Kafka. But I got this error (towards the
end):
```
(env) root@2bf12866314d:/home/beam# python -m apache_beam.yaml.main
--yaml_pipeline_file=sdks/python/apache_beam/yaml/examples/transforms/io/kafka_write.yaml
INFO:root:Missing pipeline option (runner). Executing pipeline using the
default runner: DirectRunner.
Building pipeline...
INFO:apache_beam.yaml.yaml_transform:Expanding "Read from GCS" at line 5
INFO:apache_beam.yaml.yaml_transform:Expanding "Build Kafka records" at line
10
INFO:apache_beam.yaml.yaml_transform:Expanding "Write to Kafka" at line 21
INFO:apache_beam.utils.subprocess_server:Using cached job server jar from
https://repo.maven.apache.org/maven2/org/apache/beam/beam-sdks-java-io-expansion-service/2.65.0/beam-sdks-java-io-expansion-service-2.65.0.jar
INFO:root:Starting a JAR-based expansion service from JAR
/root/.apache_beam/cache/jars/beam-sdks-java-io-expansion-service-2.65.0.jar
INFO:apache_beam.utils.subprocess_server:Starting service with ['java'
'-jar'
'/root/.apache_beam/cache/jars/beam-sdks-java-io-expansion-service-2.65.0.jar'
'60293'
'--filesToStage=/root/.apache_beam/cache/jars/beam-sdks-java-io-expansion-service-2.65.0.jar'
'--alsoStartLoopbackWorker']
INFO:apache_beam.utils.subprocess_server:Starting expansion service at
localhost:60293
INFO:apache_beam.utils.subprocess_server:May 30, 2025 4:28:25 PM
org.apache.beam.sdk.expansion.service.ExpansionService loadRegisteredTransforms
INFO:apache_beam.utils.subprocess_server:INFO: Registering external
transforms: [beam:transform:org.apache.beam:kafka_write:v2,
beam:transform:org.apache.beam:kafka_read_with_metadata:v2,
beam:transform:org.apache.beam:kafka_write:v1,
beam:transform:combine_grouped_values:v1,
beam:schematransform:org.apache.beam:iceberg_read:v1,
beam:transform:combine_globally:v1, beam:external:java:generate_sequence:v1,
beam:transform:redistribute_by_key:v1, beam:transform:window_into:v1,
beam:schematransform:org.apache.beam:kafka_read:v1,
beam:schematransform:org.apache.beam:kafka_write:v1,
beam:schematransform:org.apache.beam:iceberg_cdc_read:v1,
beam:transform:combine_per_key:v1,
beam:transform:org.apache.beam:kafka_read_with_metadata:v1,
beam:transform:group_by_key:v1, beam:transform:group_into_batches:v1,
beam:transform:group_into_batches_with_sharded_key:v1,
beam:transform:create_view:v1, beam:transform:teststream:v1,
beam:transform:sdf_process_keyed_elements:v1, beam:schematransform:org.ap
ache.beam:iceberg_write:v1, beam:transform:flatten:v1,
beam:schematransform:org.apache.beam:tfrecord_read:v1,
beam:transform:impulse:v1, beam:transform:write_files:v1,
beam:runners_core:transforms:splittable_process:v1,
beam:schematransform:org.apache.beam:tfrecord_write:v1,
beam:transform:org.apache.beam:kafka_read_without_metadata:v1,
beam:transform:managed:v1, beam:transform:reshuffle:v1,
beam:transform:redistribute_arbitrarily:v1]
INFO:apache_beam.utils.subprocess_server:
INFO:apache_beam.utils.subprocess_server:Registered transforms:
INFO:apache_beam.utils.subprocess_server:
beam:transform:org.apache.beam:kafka_write:v2:
org.apache.beam.sdk.expansion.service.ExpansionService$TransformProviderForPayloadTranslator@1755e85b
INFO:apache_beam.utils.subprocess_server:
beam:transform:org.apache.beam:kafka_read_with_metadata:v2:
org.apache.beam.sdk.expansion.service.ExpansionService$TransformProviderForPayloadTranslator@736d6a5c
INFO:apache_beam.utils.subprocess_server:
beam:transform:org.apache.beam:kafka_write:v1:
org.apache.beam.sdk.expansion.service.ExpansionService$TransformProviderForBuilder@2371aaca
INFO:apache_beam.utils.subprocess_server:
beam:transform:combine_grouped_values:v1:
org.apache.beam.sdk.expansion.service.ExpansionService$TransformProviderForPayloadTranslator@5b529706
INFO:apache_beam.utils.subprocess_server:
beam:schematransform:org.apache.beam:iceberg_read:v1:
org.apache.beam.sdk.expansion.service.ExpansionService$TransformProviderForPayloadTranslator@63fdab07
INFO:apache_beam.utils.subprocess_server:
beam:transform:combine_globally:v1:
org.apache.beam.sdk.expansion.service.ExpansionService$TransformProviderForPayloadTranslator@7b5a12ae
INFO:apache_beam.utils.subprocess_server:
beam:external:java:generate_sequence:v1:
org.apache.beam.sdk.expansion.service.ExpansionService$TransformProviderForBuilder@5553d0f5
INFO:apache_beam.utils.subprocess_server:
beam:transform:redistribute_by_key:v1:
org.apache.beam.sdk.expansion.service.ExpansionService$TransformProviderForPayloadTranslator@1af687fe
INFO:apache_beam.utils.subprocess_server:
beam:transform:window_into:v1:
org.apache.beam.sdk.expansion.service.ExpansionService$TransformProviderForPayloadTranslator@14dda234
INFO:apache_beam.utils.subprocess_server:
beam:schematransform:org.apache.beam:kafka_read:v1:
org.apache.beam.sdk.expansion.service.ExpansionService$TransformProviderForPayloadTranslator@3f390d63
INFO:apache_beam.utils.subprocess_server:
beam:schematransform:org.apache.beam:kafka_write:v1:
org.apache.beam.sdk.expansion.service.ExpansionService$TransformProviderForPayloadTranslator@74a6a609
INFO:apache_beam.utils.subprocess_server:
beam:schematransform:org.apache.beam:iceberg_cdc_read:v1:
org.apache.beam.sdk.expansion.service.ExpansionService$TransformProviderForPayloadTranslator@5a411614
INFO:apache_beam.utils.subprocess_server:
beam:transform:combine_per_key:v1:
org.apache.beam.sdk.expansion.service.ExpansionService$TransformProviderForPayloadTranslator@2374d36a
INFO:apache_beam.utils.subprocess_server:
beam:transform:org.apache.beam:kafka_read_with_metadata:v1:
org.apache.beam.sdk.expansion.service.ExpansionService$TransformProviderForBuilder@54d18072
INFO:apache_beam.utils.subprocess_server:
beam:transform:group_by_key:v1:
org.apache.beam.sdk.expansion.service.ExpansionService$TransformProviderForPayloadTranslator@1506f20f
INFO:apache_beam.utils.subprocess_server:
beam:transform:group_into_batches:v1:
org.apache.beam.sdk.expansion.service.ExpansionService$TransformProviderForPayloadTranslator@47a5b70d
INFO:apache_beam.utils.subprocess_server:
beam:transform:group_into_batches_with_sharded_key:v1:
org.apache.beam.sdk.expansion.service.ExpansionService$TransformProviderForPayloadTranslator@424fd310
INFO:apache_beam.utils.subprocess_server:
beam:transform:create_view:v1:
org.apache.beam.sdk.expansion.service.ExpansionService$TransformProviderForPayloadTranslator@1a45193b
INFO:apache_beam.utils.subprocess_server:
beam:transform:teststream:v1:
org.apache.beam.sdk.expansion.service.ExpansionService$TransformProviderForPayloadTranslator@38f116f6
INFO:apache_beam.utils.subprocess_server:
beam:transform:sdf_process_keyed_elements:v1:
org.apache.beam.sdk.expansion.service.ExpansionService$TransformProviderForPayloadTranslator@5286c33a
INFO:apache_beam.utils.subprocess_server:
beam:schematransform:org.apache.beam:iceberg_write:v1:
org.apache.beam.sdk.expansion.service.ExpansionService$TransformProviderForPayloadTranslator@6e6d5d29
INFO:apache_beam.utils.subprocess_server: beam:transform:flatten:v1:
org.apache.beam.sdk.expansion.service.ExpansionService$TransformProviderForPayloadTranslator@5c530d1e
INFO:apache_beam.utils.subprocess_server:
beam:schematransform:org.apache.beam:tfrecord_read:v1:
org.apache.beam.sdk.expansion.service.ExpansionService$TransformProviderForPayloadTranslator@6c25e6c4
INFO:apache_beam.utils.subprocess_server: beam:transform:impulse:v1:
org.apache.beam.sdk.expansion.service.ExpansionService$TransformProviderForPayloadTranslator@85e6769
INFO:apache_beam.utils.subprocess_server:
beam:transform:write_files:v1:
org.apache.beam.sdk.expansion.service.ExpansionService$TransformProviderForPayloadTranslator@c5ee75e
INFO:apache_beam.utils.subprocess_server:
beam:runners_core:transforms:splittable_process:v1:
org.apache.beam.sdk.expansion.service.ExpansionService$TransformProviderForPayloadTranslator@48a12036
INFO:apache_beam.utils.subprocess_server:
beam:schematransform:org.apache.beam:tfrecord_write:v1:
org.apache.beam.sdk.expansion.service.ExpansionService$TransformProviderForPayloadTranslator@bf1ec20
INFO:apache_beam.utils.subprocess_server:
beam:transform:org.apache.beam:kafka_read_without_metadata:v1:
org.apache.beam.sdk.expansion.service.ExpansionService$TransformProviderForBuilder@70efb718
INFO:apache_beam.utils.subprocess_server: beam:transform:managed:v1:
org.apache.beam.sdk.expansion.service.ExpansionService$TransformProviderForPayloadTranslator@b70da4c
INFO:apache_beam.utils.subprocess_server: beam:transform:reshuffle:v1:
org.apache.beam.sdk.expansion.service.ExpansionService$TransformProviderForPayloadTranslator@4a11eb84
INFO:apache_beam.utils.subprocess_server:
beam:transform:redistribute_arbitrarily:v1:
org.apache.beam.sdk.expansion.service.ExpansionService$TransformProviderForPayloadTranslator@4e858e0a
INFO:apache_beam.utils.subprocess_server:
INFO:apache_beam.utils.subprocess_server:Registered SchemaTransformProviders:
INFO:apache_beam.utils.subprocess_server:
beam:schematransform:org.apache.beam:yaml:filter-java:v1
INFO:apache_beam.utils.subprocess_server:
beam:schematransform:org.apache.beam:yaml:flatten:v1
INFO:apache_beam.utils.subprocess_server:
beam:schematransform:org.apache.beam:iceberg_read:v1
INFO:apache_beam.utils.subprocess_server:
beam:schematransform:org.apache.beam:yaml:map_to_fields-java:v1
INFO:apache_beam.utils.subprocess_server:
beam:schematransform:org.apache.beam:yaml:log_for_testing:v1
INFO:apache_beam.utils.subprocess_server:
beam:schematransform:org.apache.beam:iceberg_write:v1
INFO:apache_beam.utils.subprocess_server: beam:test_schematransform:v1
INFO:apache_beam.utils.subprocess_server:
beam:schematransform:org.apache.beam:kafka_read:v1
INFO:apache_beam.utils.subprocess_server:
beam:schematransform:org.apache.beam:kafka_write:v1
INFO:apache_beam.utils.subprocess_server:
beam:schematransform:org.apache.beam:tfrecord_read:v1
INFO:apache_beam.utils.subprocess_server:
beam:schematransform:org.apache.beam:tfrecord_write:v1
INFO:apache_beam.utils.subprocess_server:
beam:schematransform:org.apache.beam:yaml:window_into_strategy:v1
INFO:apache_beam.utils.subprocess_server:
beam:schematransform:org.apache.beam:iceberg_cdc_read:v1
INFO:apache_beam.utils.subprocess_server: beam:transform:managed:v1
INFO:apache_beam.utils.subprocess_server:
beam:schematransform:org.apache.beam:generate_sequence:v1
INFO:apache_beam.utils.subprocess_server:
beam:schematransform:org.apache.beam:yaml:explode:v1
WARNING:root:Waiting for grpc channel to be ready at localhost:60293.
INFO:apache_beam.utils.subprocess_server:Using cached job server jar from
https://repo.maven.apache.org/maven2/org/apache/beam/beam-sdks-java-io-expansion-service/2.65.0/beam-sdks-java-io-expansion-service-2.65.0.jar
INFO:root:Starting a JAR-based expansion service from JAR
/root/.apache_beam/cache/jars/beam-sdks-java-io-expansion-service-2.65.0.jar
INFO:root:Starting a JAR-based expansion service from JAR
/root/.apache_beam/cache/jars/beam-sdks-java-io-expansion-service-2.65.0.jar
INFO:apache_beam.utils.subprocess_server:May 30, 2025 4:28:28 PM
org.apache.beam.sdk.expansion.service.ExpansionService expand
INFO:apache_beam.utils.subprocess_server:INFO: Expanding 'Write to
Kafka/beam:schematransform:org.apache.beam:kafka_write:v1' with URN
'beam:expansion:payload:schematransform:v1'
Traceback (most recent call last):
File
"/home/beam/scripts/kafka-iceberg-spark/local/env/lib/python3.10/site-packages/apache_beam/yaml/yaml_transform.py",
line 507, in expand_leaf_transform
outputs = inputs | scope.unique_name(spec, ptransform) >> ptransform
File
"/home/beam/scripts/kafka-iceberg-spark/local/env/lib/python3.10/site-packages/apache_beam/pvalue.py",
line 138, in __or__
return self.pipeline.apply(ptransform, self)
File
"/home/beam/scripts/kafka-iceberg-spark/local/env/lib/python3.10/site-packages/apache_beam/pipeline.py",
line 723, in apply
return self.apply(
File
"/home/beam/scripts/kafka-iceberg-spark/local/env/lib/python3.10/site-packages/apache_beam/pipeline.py",
line 734, in apply
return self.apply(transform, pvalueish)
File
"/home/beam/scripts/kafka-iceberg-spark/local/env/lib/python3.10/site-packages/apache_beam/pipeline.py",
line 801, in apply
pvalueish_result = self.runner.apply(transform, pvalueish, self._options)
File
"/home/beam/scripts/kafka-iceberg-spark/local/env/lib/python3.10/site-packages/apache_beam/runners/runner.py",
line 191, in apply
return self.apply_PTransform(transform, input, options)
File
"/home/beam/scripts/kafka-iceberg-spark/local/env/lib/python3.10/site-packages/apache_beam/runners/runner.py",
line 195, in apply_PTransform
return transform.expand(input)
File
"/home/beam/scripts/kafka-iceberg-spark/local/env/lib/python3.10/site-packages/apache_beam/yaml/yaml_transform.py",
line 424, in recording_expand
result = original_expand(pvalue)
File
"/home/beam/scripts/kafka-iceberg-spark/local/env/lib/python3.10/site-packages/apache_beam/transforms/external.py",
line 504, in expand
return pcolls | self._payload_builder.identifier() >> ExternalTransform(
File
"/home/beam/scripts/kafka-iceberg-spark/local/env/lib/python3.10/site-packages/apache_beam/pvalue.py",
line 138, in __or__
return self.pipeline.apply(ptransform, self)
File
"/home/beam/scripts/kafka-iceberg-spark/local/env/lib/python3.10/site-packages/apache_beam/pipeline.py",
line 723, in apply
return self.apply(
File
"/home/beam/scripts/kafka-iceberg-spark/local/env/lib/python3.10/site-packages/apache_beam/pipeline.py",
line 734, in apply
return self.apply(transform, pvalueish)
File
"/home/beam/scripts/kafka-iceberg-spark/local/env/lib/python3.10/site-packages/apache_beam/pipeline.py",
line 801, in apply
pvalueish_result = self.runner.apply(transform, pvalueish, self._options)
File
"/home/beam/scripts/kafka-iceberg-spark/local/env/lib/python3.10/site-packages/apache_beam/runners/runner.py",
line 191, in apply
return self.apply_PTransform(transform, input, options)
File
"/home/beam/scripts/kafka-iceberg-spark/local/env/lib/python3.10/site-packages/apache_beam/runners/runner.py",
line 195, in apply_PTransform
return transform.expand(input)
File
"/home/beam/scripts/kafka-iceberg-spark/local/env/lib/python3.10/site-packages/apache_beam/transforms/external.py",
line 827, in expand
raise RuntimeError(_sanitize_java_traceback(response.error))
RuntimeError:
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.UncheckedExecutionException:
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.UncheckedExecutionException:
java.lang.IllegalArgumentException: Failed to decode Schema due to an error
decoding Field proto:
name: "value"
type {
nullable: true
logical_type {
urn: "beam:logical:pythonsdk_any:v1"
}
}
at
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2086)
at
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LocalCache.get(LocalCache.java:4012)
at
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LocalCache.getOrLoad(LocalCache.java:4035)
at
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:5013)
at
org.apache.beam.sdk.util.construction.RehydratedComponents.getPCollection(RehydratedComponents.java:140)
at
org.apache.beam.sdk.expansion.service.ExpansionService.lambda$expand$1(ExpansionService.java:620)
at
java.base/java.util.stream.Collectors.lambda$uniqKeysMapAccumulator$1(Collectors.java:180)
at
java.base/java.util.stream.ReduceOps$3ReducingSink.accept(ReduceOps.java:169)
at
java.base/java.util.Collections$UnmodifiableMap$UnmodifiableEntrySet.lambda$entryConsumer$0(Collections.java:1625)
at java.base/java.util.Iterator.forEachRemaining(Iterator.java:133)
at
java.base/java.util.Spliterators$IteratorSpliterator.forEachRemaining(Spliterators.java:1845)
at
java.base/java.util.Collections$UnmodifiableMap$UnmodifiableEntrySet$UnmodifiableEntrySetSpliterator.forEachRemaining(Collections.java:1650)
at
java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:509)
at
java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:499)
at
java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:921)
at
java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
at
java.base/java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:682)
at
org.apache.beam.sdk.expansion.service.ExpansionService.expand(ExpansionService.java:615)
at
org.apache.beam.sdk.expansion.service.ExpansionService.expand(ExpansionService.java:758)
at
org.apache.beam.model.expansion.v1.ExpansionServiceGrpc$MethodHandlers.invoke(ExpansionServiceGrpc.java:306)
at
org.apache.beam.vendor.grpc.v1p69p0.io.grpc.stub.ServerCalls$UnaryServerCallHandler$UnaryServerCallListener.onHalfClose(ServerCalls.java:182)
at
org.apache.beam.vendor.grpc.v1p69p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.halfClosed(ServerCallImpl.java:356)
at
org.apache.beam.vendor.grpc.v1p69p0.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1HalfClosed.runInContext(ServerImpl.java:861)
at
org.apache.beam.vendor.grpc.v1p69p0.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
at
org.apache.beam.vendor.grpc.v1p69p0.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:133)
at
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
at
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
at java.base/java.lang.Thread.run(Thread.java:840)
Caused by:
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.UncheckedExecutionException:
java.lang.IllegalArgumentException: Failed to decode Schema due to an error
decoding Field proto:
name: "value"
type {
nullable: true
logical_type {
urn: "beam:logical:pythonsdk_any:v1"
}
}
at
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2086)
at
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LocalCache.get(LocalCache.java:4012)
at
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LocalCache.getOrLoad(LocalCache.java:4035)
at
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:5013)
at
org.apache.beam.sdk.util.construction.RehydratedComponents.getCoder(RehydratedComponents.java:169)
at
org.apache.beam.sdk.util.construction.PCollectionTranslation.fromProto(PCollectionTranslation.java:51)
at
org.apache.beam.sdk.util.construction.RehydratedComponents$3.load(RehydratedComponents.java:109)
at
org.apache.beam.sdk.util.construction.RehydratedComponents$3.load(RehydratedComponents.java:99)
at
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3571)
at
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2313)
at
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2190)
at
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2080)
... 27 more
Caused by: java.lang.IllegalArgumentException: Failed to decode Schema due
to an error decoding Field proto:
name: "value"
type {
nullable: true
logical_type {
urn: "beam:logical:pythonsdk_any:v1"
}
}
at
org.apache.beam.sdk.schemas.SchemaTranslation.schemaFromProto(SchemaTranslation.java:305)
at
org.apache.beam.sdk.util.construction.CoderTranslators$8.fromComponents(CoderTranslators.java:170)
at
org.apache.beam.sdk.util.construction.CoderTranslators$8.fromComponents(CoderTranslators.java:152)
at
org.apache.beam.sdk.util.construction.CoderTranslation.fromKnownCoder(CoderTranslation.java:192)
at
org.apache.beam.sdk.util.construction.CoderTranslation.fromProto(CoderTranslation.java:170)
at
org.apache.beam.sdk.util.construction.RehydratedComponents$2.load(RehydratedComponents.java:86)
at
org.apache.beam.sdk.util.construction.RehydratedComponents$2.load(RehydratedComponents.java:81)
at
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3571)
at
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2313)
at
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2190)
at
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2080)
... 38 more
Caused by: java.lang.IllegalArgumentException: Unexpected type_info:
TYPEINFO_NOT_SET
at
org.apache.beam.sdk.schemas.SchemaTranslation.fieldTypeFromProtoWithoutNullable(SchemaTranslation.java:503)
at
org.apache.beam.sdk.schemas.SchemaTranslation.fieldTypeFromProto(SchemaTranslation.java:340)
at
org.apache.beam.sdk.schemas.SchemaTranslation.fieldTypeFromProtoWithoutNullable(SchemaTranslation.java:500)
at
org.apache.beam.sdk.schemas.SchemaTranslation.fieldTypeFromProto(SchemaTranslation.java:340)
at
org.apache.beam.sdk.schemas.SchemaTranslation.fieldFromProto(SchemaTranslation.java:333)
at
org.apache.beam.sdk.schemas.SchemaTranslation.schemaFromProto(SchemaTranslation.java:303)
... 48 more
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.UncheckedExecutionException:
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.UncheckedExecutionException:
java.lang.IllegalArgumentException: Failed to decode Schema due to an error
decoding Field proto:
name: "value"
type {
nullable: true
logical_type {
urn: "beam:logical:pythonsdk_any:v1"
}
}
name: "value"
type {
nullable: true
logical_type {
urn: "beam:logical:pythonsdk_any:v1"
}
}
name: "value"
type {
nullable: true
logical_type {
urn: "beam:logical:pythonsdk_any:v1"
}
}
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/usr/local/lib/python3.10/runpy.py", line 196, in _run_module_as_main
return _run_code(code, main_globals, None,
File "/usr/local/lib/python3.10/runpy.py", line 86, in _run_code
exec(code, run_globals)
File
"/home/beam/scripts/kafka-iceberg-spark/local/env/lib/python3.10/site-packages/apache_beam/yaml/main.py",
line 306, in <module>
run()
File
"/home/beam/scripts/kafka-iceberg-spark/local/env/lib/python3.10/site-packages/apache_beam/yaml/main.py",
line 156, in run
constructor(p)
File
"/home/beam/scripts/kafka-iceberg-spark/local/env/lib/python3.10/site-packages/apache_beam/yaml/main.py",
line 290, in constructor
yaml_transform.expand_pipeline(
File
"/home/beam/scripts/kafka-iceberg-spark/local/env/lib/python3.10/site-packages/apache_beam/yaml/yaml_transform.py",
line 1136, in expand_pipeline
providers or {})).expand(root)
File
"/home/beam/scripts/kafka-iceberg-spark/local/env/lib/python3.10/site-packages/apache_beam/yaml/yaml_transform.py",
line 1097, in expand
result = expand_transform(
File
"/home/beam/scripts/kafka-iceberg-spark/local/env/lib/python3.10/site-packages/apache_beam/yaml/yaml_transform.py",
line 479, in expand_transform
return expand_composite_transform(spec, scope)
File
"/home/beam/scripts/kafka-iceberg-spark/local/env/lib/python3.10/site-packages/apache_beam/yaml/yaml_transform.py",
line 562, in expand_composite_transform
return transform.expand(None)
File
"/home/beam/scripts/kafka-iceberg-spark/local/env/lib/python3.10/site-packages/apache_beam/yaml/yaml_transform.py",
line 545, in expand
inner_scope.compute_all()
File
"/home/beam/scripts/kafka-iceberg-spark/local/env/lib/python3.10/site-packages/apache_beam/yaml/yaml_transform.py",
line 205, in compute_all
self.compute_outputs(transform_id)
File
"/home/beam/scripts/kafka-iceberg-spark/local/env/lib/python3.10/site-packages/apache_beam/yaml/yaml_transform.py",
line 103, in wrapper
self._cache[key] = func(self, *args)
File
"/home/beam/scripts/kafka-iceberg-spark/local/env/lib/python3.10/site-packages/apache_beam/yaml/yaml_transform.py",
line 241, in compute_outputs
return expand_transform(self._transforms_by_uuid[transform_id], self)
File
"/home/beam/scripts/kafka-iceberg-spark/local/env/lib/python3.10/site-packages/apache_beam/yaml/yaml_transform.py",
line 481, in expand_transform
return expand_leaf_transform(spec, scope)
File
"/home/beam/scripts/kafka-iceberg-spark/local/env/lib/python3.10/site-packages/apache_beam/yaml/yaml_transform.py",
line 509, in expand_leaf_transform
raise ValueError(
ValueError: Error applying transform "Write to Kafka" at line 21:
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.UncheckedExecutionException:
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.UncheckedExecutionException:
java.lang.IllegalArgumentException: Failed to decode Schema due to an error
decoding Field proto:
name: "value"
type {
nullable: true
logical_type {
urn: "beam:logical:pythonsdk_any:v1"
}
}
at
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2086)
at
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LocalCache.get(LocalCache.java:4012)
at
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LocalCache.getOrLoad(LocalCache.java:4035)
at
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:5013)
at
org.apache.beam.sdk.util.construction.RehydratedComponents.getPCollection(RehydratedComponents.java:140)
at
org.apache.beam.sdk.expansion.service.ExpansionService.lambda$expand$1(ExpansionService.java:620)
at
java.base/java.util.stream.Collectors.lambda$uniqKeysMapAccumulator$1(Collectors.java:180)
at
java.base/java.util.stream.ReduceOps$3ReducingSink.accept(ReduceOps.java:169)
at
java.base/java.util.Collections$UnmodifiableMap$UnmodifiableEntrySet.lambda$entryConsumer$0(Collections.java:1625)
at java.base/java.util.Iterator.forEachRemaining(Iterator.java:133)
at
java.base/java.util.Spliterators$IteratorSpliterator.forEachRemaining(Spliterators.java:1845)
at
java.base/java.util.Collections$UnmodifiableMap$UnmodifiableEntrySet$UnmodifiableEntrySetSpliterator.forEachRemaining(Collections.java:1650)
at
java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:509)
at
java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:499)
at
java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:921)
at
java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
at
java.base/java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:682)
at
org.apache.beam.sdk.expansion.service.ExpansionService.expand(ExpansionService.java:615)
at
org.apache.beam.sdk.expansion.service.ExpansionService.expand(ExpansionService.java:758)
at
org.apache.beam.model.expansion.v1.ExpansionServiceGrpc$MethodHandlers.invoke(ExpansionServiceGrpc.java:306)
at
org.apache.beam.vendor.grpc.v1p69p0.io.grpc.stub.ServerCalls$UnaryServerCallHandler$UnaryServerCallListener.onHalfClose(ServerCalls.java:182)
at
org.apache.beam.vendor.grpc.v1p69p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.halfClosed(ServerCallImpl.java:356)
at
org.apache.beam.vendor.grpc.v1p69p0.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1HalfClosed.runInContext(ServerImpl.java:861)
at
org.apache.beam.vendor.grpc.v1p69p0.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
at
org.apache.beam.vendor.grpc.v1p69p0.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:133)
at
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
at
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
at java.base/java.lang.Thread.run(Thread.java:840)
Caused by:
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.UncheckedExecutionException:
java.lang.IllegalArgumentException: Failed to decode Schema due to an error
decoding Field proto:
name: "value"
type {
nullable: true
logical_type {
urn: "beam:logical:pythonsdk_any:v1"
}
}
at
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2086)
at
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LocalCache.get(LocalCache.java:4012)
at
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LocalCache.getOrLoad(LocalCache.java:4035)
at
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:5013)
at
org.apache.beam.sdk.util.construction.RehydratedComponents.getCoder(RehydratedComponents.java:169)
at
org.apache.beam.sdk.util.construction.PCollectionTranslation.fromProto(PCollectionTranslation.java:51)
at
org.apache.beam.sdk.util.construction.RehydratedComponents$3.load(RehydratedComponents.java:109)
at
org.apache.beam.sdk.util.construction.RehydratedComponents$3.load(RehydratedComponents.java:99)
at
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3571)
at
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2313)
at
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2190)
at
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2080)
... 27 more
Caused by: java.lang.IllegalArgumentException: Failed to decode Schema due
to an error decoding Field proto:
name: "value"
type {
nullable: true
logical_type {
urn: "beam:logical:pythonsdk_any:v1"
}
}
at
org.apache.beam.sdk.schemas.SchemaTranslation.schemaFromProto(SchemaTranslation.java:305)
at
org.apache.beam.sdk.util.construction.CoderTranslators$8.fromComponents(CoderTranslators.java:170)
at
org.apache.beam.sdk.util.construction.CoderTranslators$8.fromComponents(CoderTranslators.java:152)
at
org.apache.beam.sdk.util.construction.CoderTranslation.fromKnownCoder(CoderTranslation.java:192)
at
org.apache.beam.sdk.util.construction.CoderTranslation.fromProto(CoderTranslation.java:170)
at
org.apache.beam.sdk.util.construction.RehydratedComponents$2.load(RehydratedComponents.java:86)
at
org.apache.beam.sdk.util.construction.RehydratedComponents$2.load(RehydratedComponents.java:81)
at
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3571)
at
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2313)
at
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2190)
at
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2080)
... 38 more
Caused by: java.lang.IllegalArgumentException: Unexpected type_info:
TYPEINFO_NOT_SET
at
org.apache.beam.sdk.schemas.SchemaTranslation.fieldTypeFromProtoWithoutNullable(SchemaTranslation.java:503)
at
org.apache.beam.sdk.schemas.SchemaTranslation.fieldTypeFromProto(SchemaTranslation.java:340)
at
org.apache.beam.sdk.schemas.SchemaTranslation.fieldTypeFromProtoWithoutNullable(SchemaTranslation.java:500)
at
org.apache.beam.sdk.schemas.SchemaTranslation.fieldTypeFromProto(SchemaTranslation.java:340)
at
org.apache.beam.sdk.schemas.SchemaTranslation.fieldFromProto(SchemaTranslation.java:333)
at
org.apache.beam.sdk.schemas.SchemaTranslation.schemaFromProto(SchemaTranslation.java:303)
... 48 more
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.UncheckedExecutionException:
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.UncheckedExecutionException:
java.lang.IllegalArgumentException: Failed to decode Schema due to an error
decoding Field proto:
name: "value"
type {
nullable: true
logical_type {
urn: "beam:logical:pythonsdk_any:v1"
}
}
name: "value"
type {
nullable: true
logical_type {
urn: "beam:logical:pythonsdk_any:v1"
}
}
name: "value"
type {
nullable: true
logical_type {
urn: "beam:logical:pythonsdk_any:v1"
}
}
```
### Issue Priority
Priority: 2 (default / most bugs should be filed as P2)
### Issue Components
- [ ] Component: Python SDK
- [ ] Component: Java SDK
- [ ] Component: Go SDK
- [ ] Component: Typescript SDK
- [ ] Component: IO connector
- [x] Component: Beam YAML
- [ ] Component: Beam examples
- [ ] Component: Beam playground
- [ ] Component: Beam katas
- [ ] Component: Website
- [ ] Component: Infrastructure
- [ ] Component: Spark Runner
- [ ] Component: Flink Runner
- [ ] Component: Samza Runner
- [ ] Component: Twister2 Runner
- [ ] Component: Hazelcast Jet Runner
- [ ] Component: Google Cloud Dataflow Runner
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]