It seems likely this is a side effect of some static initialization in
AvroUtils:
https://github.com/apache/beam/blob/763b7ccd17a420eb634d6799adcd3ecfcf33d6a7/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/AvroUtils.java#L99

On Wed, Apr 29, 2020 at 9:59 PM Reuven Lax <[email protected]> wrote:

> I've copied this failing test into my client, and it passes for me. I
> can't reproduce the failure.
>
> On Wed, Apr 29, 2020 at 6:34 PM Luke Cwik <[email protected]> wrote:
>
>> +dev <[email protected]> +Brian Hulette <[email protected]> +Reuven
>> Lax <[email protected]>
>>
>> On Wed, Apr 29, 2020 at 4:21 AM Paolo Tomeo <[email protected]> wrote:
>>
>>> Hi all,
>>>
>>> I think the method AvroUtils.toBeamSchema has a not expected side
>>> effect.
>>> I found out that, if you invoke it and then you run a pipeline of
>>> GenericRecords containing a timestamp (l tried with logical-type
>>> timestamp-millis), Beam converts such timestamp from long to
>>> org.joda.time.DateTime. Even if you don't apply any transformation to the
>>> pipeline.
>>> Do you think it's a bug?
>>>
>>> Below you can find a simple test class I wrote in order to replicate the
>>> problem.
>>> The first test passes while the second fails.
>>>
>>>
>>> import org.apache.avro.Schema;
>>> import org.apache.avro.SchemaBuilder;
>>> import org.apache.avro.generic.GenericRecord;
>>> import org.apache.avro.generic.GenericRecordBuilder;
>>> import org.apache.beam.sdk.coders.AvroCoder;
>>> import org.apache.beam.sdk.schemas.utils.AvroUtils;
>>> import org.apache.beam.sdk.testing.TestPipeline;
>>> import org.apache.beam.sdk.transforms.Combine;
>>> import org.apache.beam.sdk.transforms.Create;
>>> import org.apache.beam.sdk.transforms.SerializableFunction;
>>> import org.junit.Rule;
>>>
>>> import java.sql.Timestamp;
>>>
>>> import static org.junit.Assert.assertEquals;
>>>
>>> public class AvroUtilsSideEffect {
>>>
>>>     @Rule
>>>     public final transient TestPipeline pipeline = TestPipeline.create();
>>>     @Rule
>>>     public final transient TestPipeline pipeline2 = TestPipeline.create();
>>>     public final Schema testSchema = SchemaBuilder
>>>             .record("record").namespace("test")
>>>             .fields()
>>>             .name("timestamp").type().longBuilder().prop("logicalType", 
>>> "timestamp-millis").endLong().noDefault()
>>>             .endRecord();
>>>     public final GenericRecord record = new GenericRecordBuilder(testSchema)
>>>             .set("timestamp", new Timestamp(1563926400000L).getTime())
>>>             .build();
>>>
>>>
>>>     @org.junit.Test
>>>     public void test() {
>>>         pipeline.apply( 
>>> Create.of(record).withCoder(AvroCoder.of(testSchema)))
>>>                 .apply( Combine.globally(new TestFn()));
>>>
>>>         pipeline.run().waitUntilFinish();
>>>     }
>>>     @org.junit.Test
>>>     public void test2() {
>>>
>>>         AvroUtils.toBeamSchema(testSchema);
>>>
>>>         
>>> pipeline2.apply(Create.of(record).withCoder(AvroCoder.of(testSchema)))
>>>                 .apply(Combine.globally(new TestFn()));
>>>
>>>         pipeline2.run().waitUntilFinish();
>>>     }
>>>
>>>     public static class TestFn implements 
>>> SerializableFunction<Iterable<GenericRecord>, GenericRecord> {
>>>
>>>         @Override
>>>         public GenericRecord apply(Iterable<GenericRecord> input) {
>>>             for (GenericRecord item : input) {
>>>                 if(item != null){
>>>                     assertEquals(Long.class, 
>>> item.get("timestamp").getClass());
>>>                     assertEquals(1563926400000L, item.get("timestamp"));
>>>                 }
>>>                 return item;
>>>             }
>>>             return null;
>>>         }
>>>     }
>>> }
>>>
>>> Thanks,
>>> Paolo
>>>
>>> --
>>> Paolo Tomeo, PhD
>>>
>>> Big Data and Machine Learning Engineer
>>>
>>> linkedin.com/in/ptomeo <https://www.linkedin.com/in/ptomeo>
>>>
>>

Reply via email to