Hi Juan Well done for diagnosing your issue and thank you for taking the time to report it here.
I'm not the author of this section but I've taken a quick look at the code and in line comments and have some observations which I think might help explain it. I notice it writes into temporary files and uses a HashMap<DestinationT, Writer> for maintaining a pool of writers for each destination. I presume that you are receiving a new instance of the DestinationT object on each call and therefore the HashMap will be treating these as separate entries - a new writer is created for each entry in the hashMap.. The method responsible for providing the DestinationT is the following from the FileBasedSink which does document the expectation: /** * Returns an object that represents at a high level the destination being written to. May not * return null. A destination must have deterministic hash and equality methods defined. */ public abstract DestinationT getDestination(UserT element); Beyond that I notice that it also relies on using the a hashCode from the serialised object (i.e. after running through the coder) which you note too. The inline doc explains the reasoning for that which is because hashCode is not guaranteed to be stable across machines. When elements are processed on different machines we need deterministic behaviour to direct to the correct target shard. To do that the code opts to use a murmur3_32 algorithm which is safe across machines (Today I learnt!) and it operates on the encoded bytes for the object which are to be deterministic. I agree that we should improve the documentation and state that hashCode and equals needs to be implemented when user defined objects are used for the dynamic destination. Would you mind opening a Jira for that please? I hope this helps a little, and thanks again Tim On Wed, Sep 26, 2018 at 11:24 AM Juan Carlos Garcia <jcgarc...@gmail.com> wrote: > Hi Guys, after days of bumping my head against the monitor i found why it > was not working. > > One key element when using *DynamicAvroDestinations *that is not > described in the documentation is that, if you are using a regular POJO as > *DestinationT* like i am (and not Long/String/Integer as the example) : > > {code} > DynamicAvroDestinations<String, GenericRecordDynamicDestination, > GenericRecord> > {code} > > Its very important to pay attention to equals / hashCode implementations, > which should aligned with your sharding/grouping/partition structure. Not > doing so will give you the result i described earlier (1 file (or shard) > with 1 record only, or sometime just an exception). > > While i still don't understand why it depends on equals / hashCode, as i > checked the class on: > *org.apache.beam.sdk.io.WriteFiles.ApplyShardingKeyFn:688* > > The hashing depends on the Coder itself (method: <DestinationT> int > hashDestination(DestinationT destination, Coder<DestinationT> > destinationCoder)). > > Maybe a core member could explain the reason of it, or its an unexpected > behavior and there is a bug somewhere else. > > In my case below you can find my POJO Destination along with the > corresponding Codec implementation, which works correctly as long as the > equals / hashCode are implemented: > > {code} > static class GenericRecordDynamicDestination { > private String logicalType; > private final int year; > private final int month; > private final int day; > > public GenericRecordDynamicDestination(final String _logicalType, > final int _year, final int _month, final int _day) { > logicalType = _logicalType; > year = _year; > month = _month; > day = _day; > } > > public String getLogicalType() { > return logicalType; > } > > public void setLogicalType(final String _logicalType) { > logicalType = _logicalType; > } > > public int getYear() { > return year; > } > > public int getMonth() { > return month; > } > > public int getDay() { > return day; > } > > @Override > public boolean equals(final Object _o) { > if (this == _o) return true; > if (_o == null || getClass() != _o.getClass()) return false; > > final GenericRecordDynamicDestination that = > (GenericRecordDynamicDestination) _o; > > if (year != that.year) return false; > if (month != that.month) return false; > if (day != that.day) return false; > return logicalType.equals(that.logicalType); > } > > @Override > public int hashCode() { > int result = logicalType.hashCode(); > result = 31 * result + year; > result = 31 * result + month; > result = 31 * result + day; > return result; > } > } > > static class GenericRecordDestinationCoder extends > CustomCoder<GenericRecordDynamicDestination> { > @Override > public void encode(final GenericRecordDynamicDestination value, > final OutputStream outStream) throws IOException { > final ObjectOutputStream out = new > ObjectOutputStream(outStream); > out.writeUTF(value.getLogicalType()); > out.writeInt(value.getYear()); > out.writeInt(value.getMonth()); > out.writeInt(value.getDay()); > out.flush(); > } > > @Override > public GenericRecordDynamicDestination decode(final InputStream > inStream) throws IOException { > final ObjectInputStream in = new ObjectInputStream(inStream); > String logicalType = in.readUTF(); > int year = in.readInt(); > int month = in.readInt(); > int day = in.readInt(); > return new GenericRecordDynamicDestination(logicalType, year, > month, day); > } > > @Override > public void verifyDeterministic() throws NonDeterministicException > { > // > } > } > {code} > > > On Thu, Sep 20, 2018 at 12:54 PM Juan Carlos Garcia <jcgarc...@gmail.com> > wrote: > >> I am writing a pipeline that will read from kafka and convert the data >> into Avro files with a fixed windows of 10min. >> >> I am using a *DynamicAvroDestinations *in order to build a dynamic path >> and select the corresponding schema based on the incoming data. >> >> 1.) >> While testing on my machine (With DirectRunner) i am using a File >> (BoundedSource) containing hundreds of this messages and feeding my >> pipeline with this, however i have found that sometimes, the pipeline fail >> with : >> {code}Caused by: java.nio.file.FileAlreadyExistsException: >> /tmp/test-tracking/2018/09/tracking-day-11-w--9223372036200001-0_4.avro >> at sun.nio.fs.UnixException.translateToIOException(UnixException.java:88) >> at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:102) >> at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:107) >> at sun.nio.fs.UnixCopyFile.copyFile(UnixCopyFile.java:243) >> at sun.nio.fs.UnixCopyFile.copy(UnixCopyFile.java:581) >> at sun.nio.fs.UnixFileSystemProvider.copy(UnixFileSystemProvider.java:253) >> at java.nio.file.Files.copy(Files.java:1274) >> at org.apache.beam.sdk.io.LocalFileSystem.copy(LocalFileSystem.java:143) >> at org.apache.beam.sdk.io.FileSystems.copy(FileSystems.java:301) >> at >> org.apache.beam.sdk.io.FileBasedSink$WriteOperation.moveToOutputFiles(FileBasedSink.java:756) >> at >> org.apache.beam.sdk.io.WriteFiles$FinalizeTempFileBundles$FinalizeFn.process(WriteFiles.java:801) >> {code} >> >> 2.)Trying to group all related messages into a single AVRO file >> (YYYYMMDD-HHMM), when the pipeline doesn't fail using the DirectRunner, >> the generate AVRO files only contains 1 record per file. >> >> if i generate a pseudo-random name in the *getFilenamePolicy *everything >> works, but i ends up with hundreds of files, each of them containing 1 >> record. >> >> My PipelineSteps are: >> -- ReadFromSource >> --ApplyWindows >> (FixedWindows.of(Duration.standardSeconds(config.getWindowDuration())))) >> --Create a KV from KafkaRecord (The Key is the date as YYYYMMDDHHMM) >> --GroupbyKey (this return a KV<Long, Iterable<String>>) >> --Emit Each records of the iterable as (KV<Long, String>) >> --AvroIO ( AvroIO.<KV<Long, String>>writeCustomTypeToGenericRecords() ) >> >> Also please find below is the code for the getFileNamePolicy: >> {code} >> @Override >> public FileBasedSink.FilenamePolicy getFilenamePolicy(final >> GenericRecordDestination destination) { >> return new FileBasedSink.FilenamePolicy() { >> @Override >> public ResourceId windowedFilename(final int shardNumber, final >> int numShards, final BoundedWindow window, final PaneInfo paneInfo, final >> FileBasedSink.OutputFileHints outputFileHints) { >> >> StringBuilder path = new StringBuilder(filesLocation); >> if (!filesLocation.endsWith("/")) { >> path.append("/"); >> } >> >> path.append(DateTimeUtil.format(destination.getTimestamp(), >> "yyyy/MM")) >> .append("/") >> .append(filesPrefix) >> >> .append("-day-").append(DateTimeUtil.format(destination.getTimestamp(), >> "dd")) >> >> .append("-w-").append(window.maxTimestamp().getMillis()) >> .append("-").append(shardNumber) >> .append("_").append(numShards) >> .append(AVRO_SUFFIX); >> >> return FileSystems.matchNewResource(path.toString(), false); >> } >> >> @Nullable >> @Override >> public ResourceId unwindowedFilename(final int shardNumber, final >> int numShards, final FileBasedSink.OutputFileHints outputFileHints) { >> throw new PipelineException("unwindowedFilename is not >> supported"); >> } >> }; >> } >> {code} >> >> Thanks >> -- >> >> JC >> >> > > -- > > JC > >