I am writing a pipeline that will read from kafka and convert the data into
Avro files with a fixed windows of 10min.

I am using a *DynamicAvroDestinations *in order to build a dynamic path and
select the corresponding schema based on the incoming data.

1.)
While testing on my machine (With DirectRunner) i am using a File
(BoundedSource) containing hundreds of this messages and feeding my
pipeline with this, however i have found that sometimes, the pipeline fail
with :
{code}Caused by: java.nio.file.FileAlreadyExistsException:
/tmp/test-tracking/2018/09/tracking-day-11-w--9223372036200001-0_4.avro
at sun.nio.fs.UnixException.translateToIOException(UnixException.java:88)
at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:102)
at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:107)
at sun.nio.fs.UnixCopyFile.copyFile(UnixCopyFile.java:243)
at sun.nio.fs.UnixCopyFile.copy(UnixCopyFile.java:581)
at sun.nio.fs.UnixFileSystemProvider.copy(UnixFileSystemProvider.java:253)
at java.nio.file.Files.copy(Files.java:1274)
at org.apache.beam.sdk.io.LocalFileSystem.copy(LocalFileSystem.java:143)
at org.apache.beam.sdk.io.FileSystems.copy(FileSystems.java:301)
at
org.apache.beam.sdk.io.FileBasedSink$WriteOperation.moveToOutputFiles(FileBasedSink.java:756)
at
org.apache.beam.sdk.io.WriteFiles$FinalizeTempFileBundles$FinalizeFn.process(WriteFiles.java:801)
{code}

2.)Trying to group all related messages into a single AVRO file
(YYYYMMDD-HHMM),  when the pipeline doesn't fail using the DirectRunner,
the generate AVRO files only contains 1 record per file.

if i generate a pseudo-random name in the *getFilenamePolicy *everything
works, but i ends up with hundreds of files, each of them containing 1
record.

My PipelineSteps are:
-- ReadFromSource
--ApplyWindows
(FixedWindows.of(Duration.standardSeconds(config.getWindowDuration()))))
--Create a KV from KafkaRecord  (The Key is the date as YYYYMMDDHHMM)
--GroupbyKey (this return a KV<Long, Iterable<String>>)
--Emit Each records of the iterable as (KV<Long, String>)
--AvroIO ( AvroIO.<KV<Long, String>>writeCustomTypeToGenericRecords() )

Also please find below is the code for the getFileNamePolicy:
{code}
@Override
public FileBasedSink.FilenamePolicy getFilenamePolicy(final
GenericRecordDestination destination) {
    return new FileBasedSink.FilenamePolicy() {
        @Override
        public ResourceId windowedFilename(final int shardNumber, final int
numShards, final BoundedWindow window, final PaneInfo paneInfo, final
FileBasedSink.OutputFileHints outputFileHints) {

            StringBuilder path = new StringBuilder(filesLocation);
            if (!filesLocation.endsWith("/")) {
                path.append("/");
            }

            path.append(DateTimeUtil.format(destination.getTimestamp(),
"yyyy/MM"))
                    .append("/")
                    .append(filesPrefix)

.append("-day-").append(DateTimeUtil.format(destination.getTimestamp(),
"dd"))
                    .append("-w-").append(window.maxTimestamp().getMillis())
                    .append("-").append(shardNumber)
                    .append("_").append(numShards)
                    .append(AVRO_SUFFIX);

            return FileSystems.matchNewResource(path.toString(), false);
        }

        @Nullable
        @Override
        public ResourceId unwindowedFilename(final int shardNumber, final
int numShards, final FileBasedSink.OutputFileHints outputFileHints) {
            throw new PipelineException("unwindowedFilename is not
supported");
        }
    };
}
{code}

Thanks
-- 

JC

Reply via email to