Getting apparently non-fatal ClassCastExceptions for an Enum, but only when it
is run on the DataflowRunner. Running on DirectRunner it works just fine. Full
Stacktrace (Provided at the end as it is ... lengthy) references none of my
code, other than the class name of the Enum (Usually my first stop is looking
for problems with my code, because ... you know... I wrote it.)
Overview of pipeline:
Streaming pipeline. Pubsub -> Wrapper class for Protobuf generated class ->
some transforms into another protobuf class wrapper -> ParquetIO to GCS with
dynamic destinations.
I am creating the Avro schema by using the JavaBeans schema and translating it
to an Avro schema:
beamSchema =
JavaBeanUtils.schemaFromJavaBeanClass(
EvaluatedIngestSourceWrapper.class,
JavaBeanSchema.GetterTypeSupplier.INSTANCE);
avroSchema = AvroUtils.toAvroSchema(beamSchema);
Then using that schema to create a GenericRecordBuilder and building the
GenericRecord and returning it to be serialized.
ParquetIO step, `flattened` is a PCollection<EvaluatedIngestSourceWrapper>
flattened.apply(
"Write to GCS",
FileIO.<FileNamingData, EvaluatedIngestSourceWrapper>writeDynamic()
.by(
eisw -> {
assert eisw != null;
// Must provide this as a Static Value Provider created
outside of the lambda as
// otherwise it cannot be serialized
return new FileNamingData(eisw, windowSize);
})
.via(
Contextful.fn(EvaluatedIngestSourceWrapper::asGenericRecord),
ParquetIO.sink(avroSchema))
.to("gs://" + options.getFilenamePrefix())
.withNaming(FileNamingData::fileNamingForRow)
.withDestinationCoder(pipeline.getSchemaRegistry().getSchemaCoder(FileNamingData.class))
.withNumShards(options.getNumShards()));
This works perfectly running under the DirectRunner, but when I deploy it to
Dataflow it racks up errors, but not for every record processed. Verified via
debugger that the Avro schema ends up with this definition of the enum field:
{
"name" : "evalDupIdentifiersState",
"type" : [ "null", {
"type" : "enum",
"name" : "evalDupIdentifiersState",
"doc" : "",
"symbols" : [ "potentialDuplicateIdentifier", "newIdentifier",
"duplicateIdentifier", "UNRECOGNIZED" ]
}
Full Stack trace follows:
java.lang.RuntimeException: org.apache.beam.sdk.util.UserCodeException:
java.lang.ClassCastException: class
ev24.protobuf.epIngest.v1.DupIdentifiersState cannot be cast to class
java.lang.Number (ev24.protobuf.epIngest.v1.DupIdentifiersState is in unnamed
module of loader 'app'; java.lang.Number is in module java.base of loader
'bootstrap')
at
org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowsParDoFn$1.output
(GroupAlsoByWindowsParDoFn.java:187)
at
org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowFnRunner$1.outputWindowedValue
(GroupAlsoByWindowFnRunner.java:108)
at
org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.ReduceFnRunner.lambda$onTrigger$1
(ReduceFnRunner.java:1060)
at
org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.ReduceFnContextFactory$OnTriggerContextImpl.output
(ReduceFnContextFactory.java:445)
at
org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SystemReduceFn.onTrigger
(SystemReduceFn.java:130)
at
org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.ReduceFnRunner.onTrigger
(ReduceFnRunner.java:1063)
at
org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.ReduceFnRunner.emit
(ReduceFnRunner.java:934)
at
org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.ReduceFnRunner.onTimers
(ReduceFnRunner.java:795)
at
org.apache.beam.runners.dataflow.worker.StreamingGroupAlsoByWindowViaWindowSetFn.processElement
(StreamingGroupAlsoByWindowViaWindowSetFn.java:95)
at
org.apache.beam.runners.dataflow.worker.StreamingGroupAlsoByWindowViaWindowSetFn.processElement
(StreamingGroupAlsoByWindowViaWindowSetFn.java:42)
at
org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowFnRunner.invokeProcessElement
(GroupAlsoByWindowFnRunner.java:121)
at
org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowFnRunner.processElement
(GroupAlsoByWindowFnRunner.java:73)
at
org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.LateDataDroppingDoFnRunner.processElement
(LateDataDroppingDoFnRunner.java:81)
at
org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowsParDoFn.processElement
(GroupAlsoByWindowsParDoFn.java:137)
at
org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.process
(ParDoOperation.java:44)
at
org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver.process
(OutputReceiver.java:49)
at
org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.runReadLoop
(ReadOperation.java:212)
at
org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.start
(ReadOperation.java:163)
at
org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute
(MapTaskExecutor.java:92)
at
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process
(StreamingDataflowWorker.java:1430)
at
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$1100
(StreamingDataflowWorker.java:165)
at
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$7.run
(StreamingDataflowWorker.java:1109)
at java.util.concurrent.ThreadPoolExecutor.runWorker
(ThreadPoolExecutor.java:1128)
at java.util.concurrent.ThreadPoolExecutor$Worker.run
(ThreadPoolExecutor.java:628)
at java.lang.Thread.run (Thread.java:834)
Caused by: org.apache.beam.sdk.util.UserCodeException:
java.lang.ClassCastException: class
ev24.protobuf.epIngest.v1.DupIdentifiersState cannot be cast to class
java.lang.Number (ev24.protobuf.epIngest.v1.DupIdentifiersState is in unnamed
module of loader 'app'; java.lang.Number is in module java.base of loader
'bootstrap')
at org.apache.beam.sdk.util.UserCodeException.wrap
(UserCodeException.java:39)
at
org.apache.beam.sdk.io.WriteFiles$WriteShardsIntoTempFilesFn$DoFnInvoker.invokeProcessElement
(Unknown Source)
at
org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement
(SimpleDoFnRunner.java:232)
at
org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.processElement
(SimpleDoFnRunner.java:188)
at org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processElement
(SimpleParDoFn.java:339)
at
org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.process
(ParDoOperation.java:44)
at
org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver.process
(OutputReceiver.java:49)
at
org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowsParDoFn$1.output
(GroupAlsoByWindowsParDoFn.java:185)
Caused by: java.lang.ClassCastException: class
ev24.protobuf.epIngest.v1.DupIdentifiersState cannot be cast to class
java.lang.Number (ev24.protobuf.epIngest.v1.DupIdentifiersState is in unnamed
module of loader 'app'; java.lang.Number is in module java.base of loader
'bootstrap')
at org.apache.parquet.avro.AvroWriteSupport.writeValueWithoutConversion
(AvroWriteSupport.java:329)
at org.apache.parquet.avro.AvroWriteSupport.writeValue
(AvroWriteSupport.java:284)
at org.apache.parquet.avro.AvroWriteSupport.writeRecordFields
(AvroWriteSupport.java:197)
at org.apache.parquet.avro.AvroWriteSupport.write
(AvroWriteSupport.java:171)
at org.apache.parquet.hadoop.InternalParquetRecordWriter.write
(InternalParquetRecordWriter.java:128)
at org.apache.parquet.hadoop.ParquetWriter.write
(ParquetWriter.java:301)
at org.apache.beam.sdk.io.parquet.ParquetIO$Sink.write
(ParquetIO.java:1063)
at org.apache.beam.sdk.io.parquet.ParquetIO$Sink.write
(ParquetIO.java:1008)
at org.apache.beam.sdk.io.FileIO$Write$ViaFileBasedSink$1$1.write
(FileIO.java:1382)
at org.apache.beam.sdk.io.WriteFiles.writeOrClose (WriteFiles.java:584)
at org.apache.beam.sdk.io.WriteFiles.access$1000 (WriteFiles.java:116)
at
org.apache.beam.sdk.io.WriteFiles$WriteShardsIntoTempFilesFn.processElement
(WriteFiles.java:775)
[https://storage.googleapis.com/e24-email-images/e24logonotag.png]<https://www.evolve24.com>
Andrew Kettmann
DevOps Engineer
P: 1.314.596.2836
[LinkedIn]<https://linkedin.com/company/evolve24> [Twitter]
<https://twitter.com/evolve24> [Instagram]
<https://www.instagram.com/evolve_24>
evolve24 Confidential & Proprietary Statement: This email and any attachments
are confidential and may contain information that is privileged, confidential
or exempt from disclosure under applicable law. It is intended for the use of
the recipients. If you are not the intended recipient, or believe that you have
received this communication in error, please do not read, print, copy,
retransmit, disseminate, or otherwise use the information. Please delete this
email and attachments, without reading, printing, copying, forwarding or saving
them, and notify the Sender immediately by reply email. No confidentiality or
privilege is waived or lost by any transmission in error.