Hi Tim, thanks for the explanation and it makes more senses now as why it
was failing. :)

I opened a Jira ticket https://issues.apache.org/jira/browse/BEAM-5511 for
this matter.


On Wed, Sep 26, 2018 at 1:40 PM Tim Robertson <timrobertson...@gmail.com>

> Hi Juan
> Well done for diagnosing your issue and thank you for taking the time to
> report it here.
> I'm not the author of this section but I've taken a quick look at the code
> and in line comments and have some observations which I think might help
> explain it.
> I notice it writes into temporary files and uses a HashMap<DestinationT,
> Writer> for maintaining a pool of writers for each destination. I presume
> that you are receiving a new instance of the DestinationT object on each
> call and therefore the HashMap will be treating these as separate entries -
> a new writer is created for each entry in the hashMap..  The method
> responsible for providing the DestinationT is the following from the
> FileBasedSink which does document the expectation:
> /**
>  * Returns an object that represents at a high level the destination being 
> written to. May not
>  * return null. A destination must have deterministic hash and equality 
> methods defined.
>  */
> public abstract DestinationT getDestination(UserT element);
> Beyond that I notice that it also relies on using the a hashCode from the
> serialised object (i.e. after running through the coder) which you note
> too. The inline doc explains the reasoning for that which is because
> hashCode is not guaranteed to be stable across machines. When elements are
> processed on different machines we need deterministic behaviour to direct
> to the correct target shard. To do that the code opts to use a murmur3_32
> algorithm which is safe across machines (Today I learnt!) and it operates
> on the encoded bytes for the object which are to be deterministic.
> I agree that we should improve the documentation and state that hashCode
> and equals needs to be implemented when user defined objects are used for
> the dynamic destination. Would you mind opening a Jira for that please?
> I hope this helps a little, and thanks again
> Tim
> On Wed, Sep 26, 2018 at 11:24 AM Juan Carlos Garcia <jcgarc...@gmail.com>
> wrote:
>> Hi Guys, after days of bumping my head against the monitor i found why it
>> was not working.
>> One key element when using *DynamicAvroDestinations *that is not
>> described in the documentation is that, if you are using a regular POJO as
>> *DestinationT* like i am (and not Long/String/Integer as the example) :
>> {code}
>>    DynamicAvroDestinations<String, GenericRecordDynamicDestination,
>> GenericRecord>
>> {code}
>> Its very important to pay attention to equals / hashCode
>> implementations,  which should aligned with your
>> sharding/grouping/partition structure. Not doing so will give you the
>> result i described earlier (1 file (or shard) with 1 record only, or
>> sometime just an exception).
>> While i still don't understand why it depends on equals / hashCode, as i
>> checked the class on:
>>   *org.apache.beam.sdk.io.WriteFiles.ApplyShardingKeyFn:688*
>> The hashing depends on the Coder itself (method: <DestinationT> int
>> hashDestination(DestinationT destination, Coder<DestinationT>
>> destinationCoder)).
>> Maybe a core member could explain the reason of it, or its an unexpected
>> behavior and there is a bug somewhere else.
>> In my case below you can find my POJO Destination along with the
>> corresponding Codec implementation, which works correctly as long as the
>> equals / hashCode are implemented:
>> {code}
>> static class GenericRecordDynamicDestination {
>>         private String logicalType;
>>         private final int year;
>>         private final int month;
>>         private final int day;
>>         public GenericRecordDynamicDestination(final String _logicalType,
>> final int _year, final int _month, final int _day) {
>>             logicalType = _logicalType;
>>             year = _year;
>>             month = _month;
>>             day = _day;
>>         }
>>         public String getLogicalType() {
>>             return logicalType;
>>         }
>>         public void setLogicalType(final String _logicalType) {
>>             logicalType = _logicalType;
>>         }
>>         public int getYear() {
>>             return year;
>>         }
>>         public int getMonth() {
>>             return month;
>>         }
>>         public int getDay() {
>>             return day;
>>         }
>>         @Override
>>         public boolean equals(final Object _o) {
>>             if (this == _o) return true;
>>             if (_o == null || getClass() != _o.getClass()) return false;
>>             final GenericRecordDynamicDestination that =
>> (GenericRecordDynamicDestination) _o;
>>             if (year != that.year) return false;
>>             if (month != that.month) return false;
>>             if (day != that.day) return false;
>>             return logicalType.equals(that.logicalType);
>>         }
>>         @Override
>>         public int hashCode() {
>>             int result = logicalType.hashCode();
>>             result = 31 * result + year;
>>             result = 31 * result + month;
>>             result = 31 * result + day;
>>             return result;
>>         }
>> }
>> static class GenericRecordDestinationCoder extends
>> CustomCoder<GenericRecordDynamicDestination> {
>>         @Override
>>         public void encode(final GenericRecordDynamicDestination value,
>> final OutputStream outStream) throws IOException {
>>             final ObjectOutputStream out = new
>> ObjectOutputStream(outStream);
>>             out.writeUTF(value.getLogicalType());
>>             out.writeInt(value.getYear());
>>             out.writeInt(value.getMonth());
>>             out.writeInt(value.getDay());
>>             out.flush();
>>         }
>>         @Override
>>         public GenericRecordDynamicDestination decode(final InputStream
>> inStream) throws IOException {
>>             final ObjectInputStream in = new ObjectInputStream(inStream);
>>             String logicalType = in.readUTF();
>>             int year = in.readInt();
>>             int month = in.readInt();
>>             int day = in.readInt();
>>             return new GenericRecordDynamicDestination(logicalType, year,
>> month, day);
>>         }
>>         @Override
>>         public void verifyDeterministic() throws
>> NonDeterministicException {
>>             //
>>         }
>> }
>> {code}
>> On Thu, Sep 20, 2018 at 12:54 PM Juan Carlos Garcia <jcgarc...@gmail.com>
>> wrote:
>>> I am writing a pipeline that will read from kafka and convert the data
>>> into Avro files with a fixed windows of 10min.
>>> I am using a *DynamicAvroDestinations *in order to build a dynamic path
>>> and select the corresponding schema based on the incoming data.
>>> 1.)
>>> While testing on my machine (With DirectRunner) i am using a File
>>> (BoundedSource) containing hundreds of this messages and feeding my
>>> pipeline with this, however i have found that sometimes, the pipeline fail
>>> with :
>>> {code}Caused by: java.nio.file.FileAlreadyExistsException:
>>> /tmp/test-tracking/2018/09/tracking-day-11-w--9223372036200001-0_4.avro
>>> at sun.nio.fs.UnixException.translateToIOException(UnixException.java:88)
>>> at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:102)
>>> at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:107)
>>> at sun.nio.fs.UnixCopyFile.copyFile(UnixCopyFile.java:243)
>>> at sun.nio.fs.UnixCopyFile.copy(UnixCopyFile.java:581)
>>> at
>>> sun.nio.fs.UnixFileSystemProvider.copy(UnixFileSystemProvider.java:253)
>>> at java.nio.file.Files.copy(Files.java:1274)
>>> at org.apache.beam.sdk.io.LocalFileSystem.copy(LocalFileSystem.java:143)
>>> at org.apache.beam.sdk.io.FileSystems.copy(FileSystems.java:301)
>>> at
>>> org.apache.beam.sdk.io.FileBasedSink$WriteOperation.moveToOutputFiles(FileBasedSink.java:756)
>>> at
>>> org.apache.beam.sdk.io.WriteFiles$FinalizeTempFileBundles$FinalizeFn.process(WriteFiles.java:801)
>>> {code}
>>> 2.)Trying to group all related messages into a single AVRO file
>>> (YYYYMMDD-HHMM),  when the pipeline doesn't fail using the DirectRunner,
>>> the generate AVRO files only contains 1 record per file.
>>> if i generate a pseudo-random name in the *getFilenamePolicy *everything
>>> works, but i ends up with hundreds of files, each of them containing 1
>>> record.
>>> My PipelineSteps are:
>>> -- ReadFromSource
>>> --ApplyWindows
>>> (FixedWindows.of(Duration.standardSeconds(config.getWindowDuration()))))
>>> --Create a KV from KafkaRecord  (The Key is the date as YYYYMMDDHHMM)
>>> --GroupbyKey (this return a KV<Long, Iterable<String>>)
>>> --Emit Each records of the iterable as (KV<Long, String>)
>>> --AvroIO ( AvroIO.<KV<Long, String>>writeCustomTypeToGenericRecords() )
>>> Also please find below is the code for the getFileNamePolicy:
>>> {code}
>>> @Override
>>> public FileBasedSink.FilenamePolicy getFilenamePolicy(final
>>> GenericRecordDestination destination) {
>>>     return new FileBasedSink.FilenamePolicy() {
>>>         @Override
>>>         public ResourceId windowedFilename(final int shardNumber, final
>>> int numShards, final BoundedWindow window, final PaneInfo paneInfo, final
>>> FileBasedSink.OutputFileHints outputFileHints) {
>>>             StringBuilder path = new StringBuilder(filesLocation);
>>>             if (!filesLocation.endsWith("/")) {
>>>                 path.append("/");
>>>             }
>>>             path.append(DateTimeUtil.format(destination.getTimestamp(),
>>> "yyyy/MM"))
>>>                     .append("/")
>>>                     .append(filesPrefix)
>>> .append("-day-").append(DateTimeUtil.format(destination.getTimestamp(),
>>> "dd"))
>>> .append("-w-").append(window.maxTimestamp().getMillis())
>>>                     .append("-").append(shardNumber)
>>>                     .append("_").append(numShards)
>>>                     .append(AVRO_SUFFIX);
>>>             return FileSystems.matchNewResource(path.toString(), false);
>>>         }
>>>         @Nullable
>>>         @Override
>>>         public ResourceId unwindowedFilename(final int shardNumber,
>>> final int numShards, final FileBasedSink.OutputFileHints outputFileHints) {
>>>             throw new PipelineException("unwindowedFilename is not
>>> supported");
>>>         }
>>>     };
>>> }
>>> {code}
>>> Thanks
>>> --
>>> JC
>> --
>> JC



Reply via email to