+dev <dev@beam.apache.org> +Brian Hulette <bhule...@google.com> +Reuven Lax <re...@google.com>
On Wed, Apr 29, 2020 at 4:21 AM Paolo Tomeo <p.tome...@gmail.com> wrote: > Hi all, > > I think the method AvroUtils.toBeamSchema has a not expected side effect. > I found out that, if you invoke it and then you run a pipeline of > GenericRecords containing a timestamp (l tried with logical-type > timestamp-millis), Beam converts such timestamp from long to > org.joda.time.DateTime. Even if you don't apply any transformation to the > pipeline. > Do you think it's a bug? > > Below you can find a simple test class I wrote in order to replicate the > problem. > The first test passes while the second fails. > > > import org.apache.avro.Schema; > import org.apache.avro.SchemaBuilder; > import org.apache.avro.generic.GenericRecord; > import org.apache.avro.generic.GenericRecordBuilder; > import org.apache.beam.sdk.coders.AvroCoder; > import org.apache.beam.sdk.schemas.utils.AvroUtils; > import org.apache.beam.sdk.testing.TestPipeline; > import org.apache.beam.sdk.transforms.Combine; > import org.apache.beam.sdk.transforms.Create; > import org.apache.beam.sdk.transforms.SerializableFunction; > import org.junit.Rule; > > import java.sql.Timestamp; > > import static org.junit.Assert.assertEquals; > > public class AvroUtilsSideEffect { > > @Rule > public final transient TestPipeline pipeline = TestPipeline.create(); > @Rule > public final transient TestPipeline pipeline2 = TestPipeline.create(); > public final Schema testSchema = SchemaBuilder > .record("record").namespace("test") > .fields() > .name("timestamp").type().longBuilder().prop("logicalType", > "timestamp-millis").endLong().noDefault() > .endRecord(); > public final GenericRecord record = new GenericRecordBuilder(testSchema) > .set("timestamp", new Timestamp(1563926400000L).getTime()) > .build(); > > > @org.junit.Test > public void test() { > pipeline.apply( Create.of(record).withCoder(AvroCoder.of(testSchema))) > .apply( Combine.globally(new TestFn())); > > pipeline.run().waitUntilFinish(); > } > @org.junit.Test > public void test2() { > > AvroUtils.toBeamSchema(testSchema); > > pipeline2.apply(Create.of(record).withCoder(AvroCoder.of(testSchema))) > .apply(Combine.globally(new TestFn())); > > pipeline2.run().waitUntilFinish(); > } > > public static class TestFn implements > SerializableFunction<Iterable<GenericRecord>, GenericRecord> { > > @Override > public GenericRecord apply(Iterable<GenericRecord> input) { > for (GenericRecord item : input) { > if(item != null){ > assertEquals(Long.class, > item.get("timestamp").getClass()); > assertEquals(1563926400000L, item.get("timestamp")); > } > return item; > } > return null; > } > } > } > > Thanks, > Paolo > > -- > Paolo Tomeo, PhD > > Big Data and Machine Learning Engineer > > linkedin.com/in/ptomeo <https://www.linkedin.com/in/ptomeo> >