Hi all,

I think the method AvroUtils.toBeamSchema has a not expected side effect.
I found out that, if you invoke it and then you run a pipeline of
GenericRecords containing a timestamp (l tried with logical-type
timestamp-millis), Beam converts such timestamp from long to
org.joda.time.DateTime. Even if you don't apply any transformation to the
pipeline.
Do you think it's a bug?

Below you can find a simple test class I wrote in order to replicate the
problem.
The first test passes while the second fails.


import org.apache.avro.Schema;
import org.apache.avro.SchemaBuilder;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.GenericRecordBuilder;
import org.apache.beam.sdk.coders.AvroCoder;
import org.apache.beam.sdk.schemas.utils.AvroUtils;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.junit.Rule;

import java.sql.Timestamp;

import static org.junit.Assert.assertEquals;

public class AvroUtilsSideEffect {

    @Rule
    public final transient TestPipeline pipeline = TestPipeline.create();
    @Rule
    public final transient TestPipeline pipeline2 = TestPipeline.create();
    public final Schema testSchema = SchemaBuilder
            .record("record").namespace("test")
            .fields()
            .name("timestamp").type().longBuilder().prop("logicalType",
"timestamp-millis").endLong().noDefault()
            .endRecord();
    public final GenericRecord record = new GenericRecordBuilder(testSchema)
            .set("timestamp", new Timestamp(1563926400000L).getTime())
            .build();


    @org.junit.Test
    public void test() {
        pipeline.apply( Create.of(record).withCoder(AvroCoder.of(testSchema)))
                .apply( Combine.globally(new TestFn()));

        pipeline.run().waitUntilFinish();
    }
    @org.junit.Test
    public void test2() {

        AvroUtils.toBeamSchema(testSchema);

        pipeline2.apply(Create.of(record).withCoder(AvroCoder.of(testSchema)))
                .apply(Combine.globally(new TestFn()));

        pipeline2.run().waitUntilFinish();
    }

    public static class TestFn implements
SerializableFunction<Iterable<GenericRecord>, GenericRecord> {

        @Override
        public GenericRecord apply(Iterable<GenericRecord> input) {
            for (GenericRecord item : input) {
                if(item != null){
                    assertEquals(Long.class, item.get("timestamp").getClass());
                    assertEquals(1563926400000L, item.get("timestamp"));
                }
                return item;
            }
            return null;
        }
    }
}

Thanks,
Paolo

-- 
Paolo Tomeo, PhD

Big Data and Machine Learning Engineer

linkedin.com/in/ptomeo <https://www.linkedin.com/in/ptomeo>

Reply via email to