I dug deeper and found that this global static change was introduced since the beginning of the Avro / Beam Schema support (Beam 2.15.0): https://github.com/apache/beam/commit/2a40c576cfb
On Thu, Apr 30, 2020 at 8:52 PM Ismaël Mejía <ieme...@gmail.com> wrote: > > Created https://issues.apache.org/jira/browse/BEAM-9863 to track this. > Any taker? > > On Thu, Apr 30, 2020 at 5:54 PM Reuven Lax <re...@google.com> wrote: > > > > I'm not sure who added that, but it's been there for a while. Making global > > static changes like that in our module seems like poor form - I wonder if > > there's a better approach. > > > > On Thu, Apr 30, 2020 at 8:36 AM Brian Hulette <bhule...@google.com> wrote: > >> > >> It seems likely this is a side effect of some static initialization in > >> AvroUtils: > >> https://github.com/apache/beam/blob/763b7ccd17a420eb634d6799adcd3ecfcf33d6a7/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/AvroUtils.java#L99 > >> > >> On Wed, Apr 29, 2020 at 9:59 PM Reuven Lax <re...@google.com> wrote: > >>> > >>> I've copied this failing test into my client, and it passes for me. I > >>> can't reproduce the failure. > >>> > >>> On Wed, Apr 29, 2020 at 6:34 PM Luke Cwik <lc...@google.com> wrote: > >>>> > >>>> +dev +Brian Hulette +Reuven Lax > >>>> > >>>> On Wed, Apr 29, 2020 at 4:21 AM Paolo Tomeo <p.tome...@gmail.com> wrote: > >>>>> > >>>>> Hi all, > >>>>> > >>>>> I think the method AvroUtils.toBeamSchema has a not expected side > >>>>> effect. > >>>>> I found out that, if you invoke it and then you run a pipeline of > >>>>> GenericRecords containing a timestamp (l tried with logical-type > >>>>> timestamp-millis), Beam converts such timestamp from long to > >>>>> org.joda.time.DateTime. Even if you don't apply any transformation to > >>>>> the pipeline. > >>>>> Do you think it's a bug? > >>>>> > >>>>> Below you can find a simple test class I wrote in order to replicate > >>>>> the problem. > >>>>> The first test passes while the second fails. > >>>>> > >>>>> > >>>>> import org.apache.avro.Schema; > >>>>> import org.apache.avro.SchemaBuilder; > >>>>> import org.apache.avro.generic.GenericRecord; > >>>>> import org.apache.avro.generic.GenericRecordBuilder; > >>>>> import org.apache.beam.sdk.coders.AvroCoder; > >>>>> import org.apache.beam.sdk.schemas.utils.AvroUtils; > >>>>> import org.apache.beam.sdk.testing.TestPipeline; > >>>>> import org.apache.beam.sdk.transforms.Combine; > >>>>> import org.apache.beam.sdk.transforms.Create; > >>>>> import org.apache.beam.sdk.transforms.SerializableFunction; > >>>>> import org.junit.Rule; > >>>>> > >>>>> import java.sql.Timestamp; > >>>>> > >>>>> import static org.junit.Assert.assertEquals; > >>>>> > >>>>> public class AvroUtilsSideEffect { > >>>>> > >>>>> @Rule > >>>>> public final transient TestPipeline pipeline = > >>>>> TestPipeline.create(); > >>>>> @Rule > >>>>> public final transient TestPipeline pipeline2 = > >>>>> TestPipeline.create(); > >>>>> public final Schema testSchema = SchemaBuilder > >>>>> .record("record").namespace("test") > >>>>> .fields() > >>>>> .name("timestamp").type().longBuilder().prop("logicalType", > >>>>> "timestamp-millis").endLong().noDefault() > >>>>> .endRecord(); > >>>>> public final GenericRecord record = new > >>>>> GenericRecordBuilder(testSchema) > >>>>> .set("timestamp", new Timestamp(1563926400000L).getTime()) > >>>>> .build(); > >>>>> > >>>>> > >>>>> @org.junit.Test > >>>>> public void test() { > >>>>> pipeline.apply( > >>>>> Create.of(record).withCoder(AvroCoder.of(testSchema))) > >>>>> .apply( Combine.globally(new TestFn())); > >>>>> > >>>>> pipeline.run().waitUntilFinish(); > >>>>> } > >>>>> @org.junit.Test > >>>>> public void test2() { > >>>>> > >>>>> AvroUtils.toBeamSchema(testSchema); > >>>>> > >>>>> > >>>>> pipeline2.apply(Create.of(record).withCoder(AvroCoder.of(testSchema))) > >>>>> .apply(Combine.globally(new TestFn())); > >>>>> > >>>>> pipeline2.run().waitUntilFinish(); > >>>>> } > >>>>> > >>>>> public static class TestFn implements > >>>>> SerializableFunction<Iterable<GenericRecord>, GenericRecord> { > >>>>> > >>>>> @Override > >>>>> public GenericRecord apply(Iterable<GenericRecord> input) { > >>>>> for (GenericRecord item : input) { > >>>>> if(item != null){ > >>>>> assertEquals(Long.class, > >>>>> item.get("timestamp").getClass()); > >>>>> assertEquals(1563926400000L, item.get("timestamp")); > >>>>> } > >>>>> return item; > >>>>> } > >>>>> return null; > >>>>> } > >>>>> } > >>>>> } > >>>>> > >>>>> Thanks, > >>>>> Paolo > >>>>> > >>>>> -- > >>>>> Paolo Tomeo, PhD > >>>>> > >>>>> Big Data and Machine Learning Engineer > >>>>> > >>>>> linkedin.com/in/ptomeo