+dev <dev@beam.apache.org> +Brian Hulette <bhule...@google.com> +Reuven Lax
<re...@google.com>

On Wed, Apr 29, 2020 at 4:21 AM Paolo Tomeo <p.tome...@gmail.com> wrote:

> Hi all,
>
> I think the method AvroUtils.toBeamSchema has a not expected side effect.
> I found out that, if you invoke it and then you run a pipeline of
> GenericRecords containing a timestamp (l tried with logical-type
> timestamp-millis), Beam converts such timestamp from long to
> org.joda.time.DateTime. Even if you don't apply any transformation to the
> pipeline.
> Do you think it's a bug?
>
> Below you can find a simple test class I wrote in order to replicate the
> problem.
> The first test passes while the second fails.
>
>
> import org.apache.avro.Schema;
> import org.apache.avro.SchemaBuilder;
> import org.apache.avro.generic.GenericRecord;
> import org.apache.avro.generic.GenericRecordBuilder;
> import org.apache.beam.sdk.coders.AvroCoder;
> import org.apache.beam.sdk.schemas.utils.AvroUtils;
> import org.apache.beam.sdk.testing.TestPipeline;
> import org.apache.beam.sdk.transforms.Combine;
> import org.apache.beam.sdk.transforms.Create;
> import org.apache.beam.sdk.transforms.SerializableFunction;
> import org.junit.Rule;
>
> import java.sql.Timestamp;
>
> import static org.junit.Assert.assertEquals;
>
> public class AvroUtilsSideEffect {
>
>     @Rule
>     public final transient TestPipeline pipeline = TestPipeline.create();
>     @Rule
>     public final transient TestPipeline pipeline2 = TestPipeline.create();
>     public final Schema testSchema = SchemaBuilder
>             .record("record").namespace("test")
>             .fields()
>             .name("timestamp").type().longBuilder().prop("logicalType", 
> "timestamp-millis").endLong().noDefault()
>             .endRecord();
>     public final GenericRecord record = new GenericRecordBuilder(testSchema)
>             .set("timestamp", new Timestamp(1563926400000L).getTime())
>             .build();
>
>
>     @org.junit.Test
>     public void test() {
>         pipeline.apply( Create.of(record).withCoder(AvroCoder.of(testSchema)))
>                 .apply( Combine.globally(new TestFn()));
>
>         pipeline.run().waitUntilFinish();
>     }
>     @org.junit.Test
>     public void test2() {
>
>         AvroUtils.toBeamSchema(testSchema);
>
>         pipeline2.apply(Create.of(record).withCoder(AvroCoder.of(testSchema)))
>                 .apply(Combine.globally(new TestFn()));
>
>         pipeline2.run().waitUntilFinish();
>     }
>
>     public static class TestFn implements 
> SerializableFunction<Iterable<GenericRecord>, GenericRecord> {
>
>         @Override
>         public GenericRecord apply(Iterable<GenericRecord> input) {
>             for (GenericRecord item : input) {
>                 if(item != null){
>                     assertEquals(Long.class, 
> item.get("timestamp").getClass());
>                     assertEquals(1563926400000L, item.get("timestamp"));
>                 }
>                 return item;
>             }
>             return null;
>         }
>     }
> }
>
> Thanks,
> Paolo
>
> --
> Paolo Tomeo, PhD
>
> Big Data and Machine Learning Engineer
>
> linkedin.com/in/ptomeo <https://www.linkedin.com/in/ptomeo>
>

Reply via email to