It seems likely this is a side effect of some static initialization in AvroUtils: https://github.com/apache/beam/blob/763b7ccd17a420eb634d6799adcd3ecfcf33d6a7/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/AvroUtils.java#L99
On Wed, Apr 29, 2020 at 9:59 PM Reuven Lax <[email protected]> wrote: > I've copied this failing test into my client, and it passes for me. I > can't reproduce the failure. > > On Wed, Apr 29, 2020 at 6:34 PM Luke Cwik <[email protected]> wrote: > >> +dev <[email protected]> +Brian Hulette <[email protected]> +Reuven >> Lax <[email protected]> >> >> On Wed, Apr 29, 2020 at 4:21 AM Paolo Tomeo <[email protected]> wrote: >> >>> Hi all, >>> >>> I think the method AvroUtils.toBeamSchema has a not expected side >>> effect. >>> I found out that, if you invoke it and then you run a pipeline of >>> GenericRecords containing a timestamp (l tried with logical-type >>> timestamp-millis), Beam converts such timestamp from long to >>> org.joda.time.DateTime. Even if you don't apply any transformation to the >>> pipeline. >>> Do you think it's a bug? >>> >>> Below you can find a simple test class I wrote in order to replicate the >>> problem. >>> The first test passes while the second fails. >>> >>> >>> import org.apache.avro.Schema; >>> import org.apache.avro.SchemaBuilder; >>> import org.apache.avro.generic.GenericRecord; >>> import org.apache.avro.generic.GenericRecordBuilder; >>> import org.apache.beam.sdk.coders.AvroCoder; >>> import org.apache.beam.sdk.schemas.utils.AvroUtils; >>> import org.apache.beam.sdk.testing.TestPipeline; >>> import org.apache.beam.sdk.transforms.Combine; >>> import org.apache.beam.sdk.transforms.Create; >>> import org.apache.beam.sdk.transforms.SerializableFunction; >>> import org.junit.Rule; >>> >>> import java.sql.Timestamp; >>> >>> import static org.junit.Assert.assertEquals; >>> >>> public class AvroUtilsSideEffect { >>> >>> @Rule >>> public final transient TestPipeline pipeline = TestPipeline.create(); >>> @Rule >>> public final transient TestPipeline pipeline2 = TestPipeline.create(); >>> public final Schema testSchema = SchemaBuilder >>> .record("record").namespace("test") >>> .fields() >>> .name("timestamp").type().longBuilder().prop("logicalType", >>> "timestamp-millis").endLong().noDefault() >>> .endRecord(); >>> public final GenericRecord record = new GenericRecordBuilder(testSchema) >>> .set("timestamp", new Timestamp(1563926400000L).getTime()) >>> .build(); >>> >>> >>> @org.junit.Test >>> public void test() { >>> pipeline.apply( >>> Create.of(record).withCoder(AvroCoder.of(testSchema))) >>> .apply( Combine.globally(new TestFn())); >>> >>> pipeline.run().waitUntilFinish(); >>> } >>> @org.junit.Test >>> public void test2() { >>> >>> AvroUtils.toBeamSchema(testSchema); >>> >>> >>> pipeline2.apply(Create.of(record).withCoder(AvroCoder.of(testSchema))) >>> .apply(Combine.globally(new TestFn())); >>> >>> pipeline2.run().waitUntilFinish(); >>> } >>> >>> public static class TestFn implements >>> SerializableFunction<Iterable<GenericRecord>, GenericRecord> { >>> >>> @Override >>> public GenericRecord apply(Iterable<GenericRecord> input) { >>> for (GenericRecord item : input) { >>> if(item != null){ >>> assertEquals(Long.class, >>> item.get("timestamp").getClass()); >>> assertEquals(1563926400000L, item.get("timestamp")); >>> } >>> return item; >>> } >>> return null; >>> } >>> } >>> } >>> >>> Thanks, >>> Paolo >>> >>> -- >>> Paolo Tomeo, PhD >>> >>> Big Data and Machine Learning Engineer >>> >>> linkedin.com/in/ptomeo <https://www.linkedin.com/in/ptomeo> >>> >>
