Hi Juan

Well done for diagnosing your issue and thank you for taking the time to
report it here.

I'm not the author of this section but I've taken a quick look at the code
and in line comments and have some observations which I think might help
explain it.

I notice it writes into temporary files and uses a HashMap<DestinationT,
Writer> for maintaining a pool of writers for each destination. I presume
that you are receiving a new instance of the DestinationT object on each
call and therefore the HashMap will be treating these as separate entries -
a new writer is created for each entry in the hashMap..  The method
responsible for providing the DestinationT is the following from the
FileBasedSink which does document the expectation:

/**
 * Returns an object that represents at a high level the destination
being written to. May not
 * return null. A destination must have deterministic hash and
equality methods defined.
 */
public abstract DestinationT getDestination(UserT element);


Beyond that I notice that it also relies on using the a hashCode from the
serialised object (i.e. after running through the coder) which you note
too. The inline doc explains the reasoning for that which is because
hashCode is not guaranteed to be stable across machines. When elements are
processed on different machines we need deterministic behaviour to direct
to the correct target shard. To do that the code opts to use a murmur3_32
algorithm which is safe across machines (Today I learnt!) and it operates
on the encoded bytes for the object which are to be deterministic.

I agree that we should improve the documentation and state that hashCode
and equals needs to be implemented when user defined objects are used for
the dynamic destination. Would you mind opening a Jira for that please?

I hope this helps a little, and thanks again
Tim











On Wed, Sep 26, 2018 at 11:24 AM Juan Carlos Garcia <jcgarc...@gmail.com>
wrote:

> Hi Guys, after days of bumping my head against the monitor i found why it
> was not working.
>
> One key element when using *DynamicAvroDestinations *that is not
> described in the documentation is that, if you are using a regular POJO as
> *DestinationT* like i am (and not Long/String/Integer as the example) :
>
> {code}
>    DynamicAvroDestinations<String, GenericRecordDynamicDestination,
> GenericRecord>
> {code}
>
> Its very important to pay attention to equals / hashCode implementations,
> which should aligned with your sharding/grouping/partition structure. Not
> doing so will give you the result i described earlier (1 file (or shard)
> with 1 record only, or sometime just an exception).
>
> While i still don't understand why it depends on equals / hashCode, as i
> checked the class on:
>   *org.apache.beam.sdk.io.WriteFiles.ApplyShardingKeyFn:688*
>
> The hashing depends on the Coder itself (method: <DestinationT> int
> hashDestination(DestinationT destination, Coder<DestinationT>
> destinationCoder)).
>
> Maybe a core member could explain the reason of it, or its an unexpected
> behavior and there is a bug somewhere else.
>
> In my case below you can find my POJO Destination along with the
> corresponding Codec implementation, which works correctly as long as the
> equals / hashCode are implemented:
>
> {code}
> static class GenericRecordDynamicDestination {
>         private String logicalType;
>         private final int year;
>         private final int month;
>         private final int day;
>
>         public GenericRecordDynamicDestination(final String _logicalType,
> final int _year, final int _month, final int _day) {
>             logicalType = _logicalType;
>             year = _year;
>             month = _month;
>             day = _day;
>         }
>
>         public String getLogicalType() {
>             return logicalType;
>         }
>
>         public void setLogicalType(final String _logicalType) {
>             logicalType = _logicalType;
>         }
>
>         public int getYear() {
>             return year;
>         }
>
>         public int getMonth() {
>             return month;
>         }
>
>         public int getDay() {
>             return day;
>         }
>
>         @Override
>         public boolean equals(final Object _o) {
>             if (this == _o) return true;
>             if (_o == null || getClass() != _o.getClass()) return false;
>
>             final GenericRecordDynamicDestination that =
> (GenericRecordDynamicDestination) _o;
>
>             if (year != that.year) return false;
>             if (month != that.month) return false;
>             if (day != that.day) return false;
>             return logicalType.equals(that.logicalType);
>         }
>
>         @Override
>         public int hashCode() {
>             int result = logicalType.hashCode();
>             result = 31 * result + year;
>             result = 31 * result + month;
>             result = 31 * result + day;
>             return result;
>         }
> }
>
> static class GenericRecordDestinationCoder extends
> CustomCoder<GenericRecordDynamicDestination> {
>         @Override
>         public void encode(final GenericRecordDynamicDestination value,
> final OutputStream outStream) throws IOException {
>             final ObjectOutputStream out = new
> ObjectOutputStream(outStream);
>             out.writeUTF(value.getLogicalType());
>             out.writeInt(value.getYear());
>             out.writeInt(value.getMonth());
>             out.writeInt(value.getDay());
>             out.flush();
>         }
>
>         @Override
>         public GenericRecordDynamicDestination decode(final InputStream
> inStream) throws IOException {
>             final ObjectInputStream in = new ObjectInputStream(inStream);
>             String logicalType = in.readUTF();
>             int year = in.readInt();
>             int month = in.readInt();
>             int day = in.readInt();
>             return new GenericRecordDynamicDestination(logicalType, year,
> month, day);
>         }
>
>         @Override
>         public void verifyDeterministic() throws NonDeterministicException
> {
>             //
>         }
> }
> {code}
>
>
> On Thu, Sep 20, 2018 at 12:54 PM Juan Carlos Garcia <jcgarc...@gmail.com>
> wrote:
>
>> I am writing a pipeline that will read from kafka and convert the data
>> into Avro files with a fixed windows of 10min.
>>
>> I am using a *DynamicAvroDestinations *in order to build a dynamic path
>> and select the corresponding schema based on the incoming data.
>>
>> 1.)
>> While testing on my machine (With DirectRunner) i am using a File
>> (BoundedSource) containing hundreds of this messages and feeding my
>> pipeline with this, however i have found that sometimes, the pipeline fail
>> with :
>> {code}Caused by: java.nio.file.FileAlreadyExistsException:
>> /tmp/test-tracking/2018/09/tracking-day-11-w--9223372036200001-0_4.avro
>> at sun.nio.fs.UnixException.translateToIOException(UnixException.java:88)
>> at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:102)
>> at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:107)
>> at sun.nio.fs.UnixCopyFile.copyFile(UnixCopyFile.java:243)
>> at sun.nio.fs.UnixCopyFile.copy(UnixCopyFile.java:581)
>> at sun.nio.fs.UnixFileSystemProvider.copy(UnixFileSystemProvider.java:253)
>> at java.nio.file.Files.copy(Files.java:1274)
>> at org.apache.beam.sdk.io.LocalFileSystem.copy(LocalFileSystem.java:143)
>> at org.apache.beam.sdk.io.FileSystems.copy(FileSystems.java:301)
>> at
>> org.apache.beam.sdk.io.FileBasedSink$WriteOperation.moveToOutputFiles(FileBasedSink.java:756)
>> at
>> org.apache.beam.sdk.io.WriteFiles$FinalizeTempFileBundles$FinalizeFn.process(WriteFiles.java:801)
>> {code}
>>
>> 2.)Trying to group all related messages into a single AVRO file
>> (YYYYMMDD-HHMM),  when the pipeline doesn't fail using the DirectRunner,
>> the generate AVRO files only contains 1 record per file.
>>
>> if i generate a pseudo-random name in the *getFilenamePolicy *everything
>> works, but i ends up with hundreds of files, each of them containing 1
>> record.
>>
>> My PipelineSteps are:
>> -- ReadFromSource
>> --ApplyWindows
>> (FixedWindows.of(Duration.standardSeconds(config.getWindowDuration()))))
>> --Create a KV from KafkaRecord  (The Key is the date as YYYYMMDDHHMM)
>> --GroupbyKey (this return a KV<Long, Iterable<String>>)
>> --Emit Each records of the iterable as (KV<Long, String>)
>> --AvroIO ( AvroIO.<KV<Long, String>>writeCustomTypeToGenericRecords() )
>>
>> Also please find below is the code for the getFileNamePolicy:
>> {code}
>> @Override
>> public FileBasedSink.FilenamePolicy getFilenamePolicy(final
>> GenericRecordDestination destination) {
>>     return new FileBasedSink.FilenamePolicy() {
>>         @Override
>>         public ResourceId windowedFilename(final int shardNumber, final
>> int numShards, final BoundedWindow window, final PaneInfo paneInfo, final
>> FileBasedSink.OutputFileHints outputFileHints) {
>>
>>             StringBuilder path = new StringBuilder(filesLocation);
>>             if (!filesLocation.endsWith("/")) {
>>                 path.append("/");
>>             }
>>
>>             path.append(DateTimeUtil.format(destination.getTimestamp(),
>> "yyyy/MM"))
>>                     .append("/")
>>                     .append(filesPrefix)
>>
>> .append("-day-").append(DateTimeUtil.format(destination.getTimestamp(),
>> "dd"))
>>
>> .append("-w-").append(window.maxTimestamp().getMillis())
>>                     .append("-").append(shardNumber)
>>                     .append("_").append(numShards)
>>                     .append(AVRO_SUFFIX);
>>
>>             return FileSystems.matchNewResource(path.toString(), false);
>>         }
>>
>>         @Nullable
>>         @Override
>>         public ResourceId unwindowedFilename(final int shardNumber, final
>> int numShards, final FileBasedSink.OutputFileHints outputFileHints) {
>>             throw new PipelineException("unwindowedFilename is not
>> supported");
>>         }
>>     };
>> }
>> {code}
>>
>> Thanks
>> --
>>
>> JC
>>
>>
>
> --
>
> JC
>
>

Reply via email to