Hi all,

We recently implemented a feature in our streaming flink job in which we have a 
AvroParquetWriter which we build every time the overridden “write” method from 
org.apache.flink.streaming.connectors.fs.Writer gets called. We had to do this 
because the schema of each record is potentially different and we have to get 
the schema for the AvroParquetWriter out of the record itself first. Previously 
this builder was built only one time in the “open” method and from then only 
the write method was called per record.

Since implementing this our job crashes with “Connection unexpectedly closed by 
remote task manager ‘internal company url’. This might indicate that the remote 
task manager was lost.”

We did not run into any issues on our test environments, so we are suspecting 
this problem occurs only on higher loads as we have on our production 
environment. Unfortunately we still don’t have a proper means of reproducing 
this much load on our test environment to debug.

Would having the AvroParquetWriter being built on every write be causing the 
problem and if so why would that be the case?

Any help in getting to the bottom of the issue would be really appreciated. 
Bellow there is a code snippet of the class which uses the AvroParquetWriter.

Best regards,
Ivan Budincevic
Software engineer, bol.com
Netherlands

package com.bol.measure.timeblocks.files;

import com.bol.measure.timeblocks.measurement.SlottedMeasurements;
import org.apache.avro.generic.GenericRecord;
import org.apache.flink.streaming.connectors.fs.Writer;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.avro.AvroParquetWriter;
import org.apache.parquet.column.ParquetProperties;
import org.apache.parquet.hadoop.ParquetFileWriter;
import org.apache.parquet.hadoop.ParquetWriter;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;

import java.io.IOException;

public class SlottedMeasurementsWriter implements Writer<SlottedMeasurements> {
  private transient ParquetWriter<GenericRecord> parquetWriter;
  private boolean overwrite;
  private Path path;

  public SlottedMeasurementsWriter(boolean overwrite) {
    this.overwrite = overwrite;
  }

  @Override
  public void open(FileSystem fs, Path path) throws IOException {
    this.path = path;
  }

  @Override
  public long flush() throws IOException {
    return parquetWriter.getDataSize();
  }

  @Override
  public long getPos() throws IOException {
    return parquetWriter.getDataSize();
  }

  @Override
  public void close() throws IOException {
    parquetWriter.close();
  }

  @Override
  public void write(SlottedMeasurements slot) throws IOException {

    final AvroParquetWriter.Builder<GenericRecord> writerBuilder =
      AvroParquetWriter
        .<GenericRecord>builder(path)
        .withSchema(slot.getMeasurements().get(0).getSchema())
        .withCompressionCodec(CompressionCodecName.UNCOMPRESSED)
        .withDictionaryEncoding(true)
        .withWriterVersion(ParquetProperties.WriterVersion.PARQUET_1_0);
    if (overwrite) {
      writerBuilder.withWriteMode(ParquetFileWriter.Mode.OVERWRITE);
    }

    parquetWriter = writerBuilder.build();

    for (GenericRecord measurement : slot.getMeasurements()) {
      parquetWriter.write(measurement);
    }
  }


  @Override
  public Writer<SlottedMeasurements> duplicate() {
    return new SlottedMeasurementsWriter(this.overwrite);
  }
}


Reply via email to