I am writing a pipeline that will read from kafka and convert the data into Avro files with a fixed windows of 10min.
I am using a *DynamicAvroDestinations *in order to build a dynamic path and select the corresponding schema based on the incoming data. 1.) While testing on my machine (With DirectRunner) i am using a File (BoundedSource) containing hundreds of this messages and feeding my pipeline with this, however i have found that sometimes, the pipeline fail with : {code}Caused by: java.nio.file.FileAlreadyExistsException: /tmp/test-tracking/2018/09/tracking-day-11-w--9223372036200001-0_4.avro at sun.nio.fs.UnixException.translateToIOException(UnixException.java:88) at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:102) at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:107) at sun.nio.fs.UnixCopyFile.copyFile(UnixCopyFile.java:243) at sun.nio.fs.UnixCopyFile.copy(UnixCopyFile.java:581) at sun.nio.fs.UnixFileSystemProvider.copy(UnixFileSystemProvider.java:253) at java.nio.file.Files.copy(Files.java:1274) at org.apache.beam.sdk.io.LocalFileSystem.copy(LocalFileSystem.java:143) at org.apache.beam.sdk.io.FileSystems.copy(FileSystems.java:301) at org.apache.beam.sdk.io.FileBasedSink$WriteOperation.moveToOutputFiles(FileBasedSink.java:756) at org.apache.beam.sdk.io.WriteFiles$FinalizeTempFileBundles$FinalizeFn.process(WriteFiles.java:801) {code} 2.)Trying to group all related messages into a single AVRO file (YYYYMMDD-HHMM), when the pipeline doesn't fail using the DirectRunner, the generate AVRO files only contains 1 record per file. if i generate a pseudo-random name in the *getFilenamePolicy *everything works, but i ends up with hundreds of files, each of them containing 1 record. My PipelineSteps are: -- ReadFromSource --ApplyWindows (FixedWindows.of(Duration.standardSeconds(config.getWindowDuration())))) --Create a KV from KafkaRecord (The Key is the date as YYYYMMDDHHMM) --GroupbyKey (this return a KV<Long, Iterable<String>>) --Emit Each records of the iterable as (KV<Long, String>) --AvroIO ( AvroIO.<KV<Long, String>>writeCustomTypeToGenericRecords() ) Also please find below is the code for the getFileNamePolicy: {code} @Override public FileBasedSink.FilenamePolicy getFilenamePolicy(final GenericRecordDestination destination) { return new FileBasedSink.FilenamePolicy() { @Override public ResourceId windowedFilename(final int shardNumber, final int numShards, final BoundedWindow window, final PaneInfo paneInfo, final FileBasedSink.OutputFileHints outputFileHints) { StringBuilder path = new StringBuilder(filesLocation); if (!filesLocation.endsWith("/")) { path.append("/"); } path.append(DateTimeUtil.format(destination.getTimestamp(), "yyyy/MM")) .append("/") .append(filesPrefix) .append("-day-").append(DateTimeUtil.format(destination.getTimestamp(), "dd")) .append("-w-").append(window.maxTimestamp().getMillis()) .append("-").append(shardNumber) .append("_").append(numShards) .append(AVRO_SUFFIX); return FileSystems.matchNewResource(path.toString(), false); } @Nullable @Override public ResourceId unwindowedFilename(final int shardNumber, final int numShards, final FileBasedSink.OutputFileHints outputFileHints) { throw new PipelineException("unwindowedFilename is not supported"); } }; } {code} Thanks -- JC