Hi Flavio,

If I understand correctly, I think you bumped into this issue: 
https://issues.apache.org/jira/browse/FLINK-2646 
<https://issues.apache.org/jira/browse/FLINK-2646>

There is also a similar discussion on the BucketingSink here: 
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Adding-a-dispose-method-in-the-RichFunction-td14466.html#a14468
 
<http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Adding-a-dispose-method-in-the-RichFunction-td14466.html#a14468>

Kostas

> On Sep 8, 2017, at 4:27 PM, Flavio Pompermaier <pomperma...@okkam.it> wrote:
> 
> Hi to all,
> I'm trying to test a streaming job but the files written by the BucketingSink 
> are never finalized (remains into the pending state).
> Is this caused by the fact that the job finishes before the checkpoint?
> Shouldn't the sink properly close anyway?
> 
> This is my code:
> 
>   @Test
>   public void testBucketingSink() throws Exception {
>     final StreamExecutionEnvironment senv = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>     final StreamTableEnvironment tEnv = 
> TableEnvironment.getTableEnvironment(senv);
>     senv.enableCheckpointing(5000);
>     DataStream<String> testStream = senv.fromElements(//
>         "1,aaa,white", //
>         "2,bbb,gray", //
>         "3,ccc,white", //
>         "4,bbb,gray", //
>         "5,bbb,gray" //
>     );
>     final RowTypeInfo rtf = new RowTypeInfo(
>         BasicTypeInfo.STRING_TYPE_INFO,
>         BasicTypeInfo.STRING_TYPE_INFO, 
>         BasicTypeInfo.STRING_TYPE_INFO);
>     DataStream<Row> rows = testStream.map(new MapFunction<String, Row>() {
> 
>       private static final long serialVersionUID = 1L;
> 
>       @Override
>       public Row map(String str) throws Exception {
>         String[] split = str.split(Pattern.quote(","));
>         Row ret = new Row(3);
>         ret.setField(0, split[0]);
>         ret.setField(1, split[1]);
>         ret.setField(2, split[2]);
>         return ret;
>       }
>     }).returns(rtf);
> 
>     String columnNames = "id,value,state";
>     final String dsName = "test";
>     tEnv.registerDataStream(dsName, rows, columnNames);
>     final String whiteAreaFilter = "state = 'white'";
>     DataStream<Row> grayArea = rows;
>     DataStream<Row> whiteArea = null;
>     if (whiteAreaFilter != null) {
>       String sql = "SELECT *, (%s) as _WHITE FROM %s";
>       sql = String.format(sql, whiteAreaFilter, dsName);
>       Table table = tEnv.sql(sql);
>       grayArea = 
> tEnv.toDataStream(table.where("!_WHITE").select(columnNames), rtf);
>       DataStream<Row> nw = 
> tEnv.toDataStream(table.where("_WHITE").select(columnNames), rtf);
>       whiteArea = whiteArea == null ? nw : whiteArea.union(nw);
>     }
>     Writer<Row> bucketSinkwriter = new RowCsvWriter("UTF-8", "\t", "\n");
> 
>     String datasetWhiteDir = "/tmp/bucket/white";
>     BucketingSink<Row> whiteAreaSink = new 
> BucketingSink<>(datasetWhiteDir.toString());
>     whiteAreaSink.setWriter(bucketSinkwriter);
>     whiteAreaSink.setBatchSize(10);
>     whiteArea.addSink(whiteAreaSink);
> 
>     String datasetGrayDir = "/tmp/bucket/gray";
>     BucketingSink<Row> grayAreaSink = new 
> BucketingSink<>(datasetGrayDir.toString());
>     grayAreaSink.setWriter(bucketSinkwriter);
>     grayAreaSink.setBatchSize(10);
>     grayArea.addSink(grayAreaSink);
> 
>     JobExecutionResult jobInfo = senv.execute("Buketing sink test ");
>     System.out.printf("Job took %s minutes", 
> jobInfo.getNetRuntime(TimeUnit.MINUTES));
>   }
> 
> 
> 
> 
> 
> 
> 
> public class RowCsvWriter extends StreamWriterBase<Row> {
>   private static final long serialVersionUID = 1L;
> 
>   private final String charsetName;
>   private transient Charset charset;
>   private String fieldDelimiter;
>   private String recordDelimiter;
>   private boolean allowNullValues = true;
>   private boolean quoteStrings = false;
> 
>   /**
>    * Creates a new {@code StringWriter} that uses {@code "UTF-8"} charset to 
> convert strings to
>    * bytes.
>    */
>   public RowCsvWriter() {
>     this("UTF-8", CsvOutputFormat.DEFAULT_FIELD_DELIMITER, 
> CsvOutputFormat.DEFAULT_LINE_DELIMITER);
>   }
> 
>   /**
>    * Creates a new {@code StringWriter} that uses the given charset to 
> convert strings to bytes.
>    *
>    * @param charsetName Name of the charset to be used, must be valid input 
> for
>    *        {@code Charset.forName(charsetName)}
>    */
>   public RowCsvWriter(String charsetName, String fieldDelimiter, String 
> recordDelimiter) {
>     this.charsetName = charsetName;
>     this.fieldDelimiter = fieldDelimiter;
>     this.recordDelimiter = recordDelimiter;
>   }
> 
>   @Override
>   public void open(FileSystem fs, Path path) throws IOException {
>     super.open(fs, path);
> 
>     try {
>       this.charset = Charset.forName(charsetName);
>     } catch (IllegalCharsetNameException ex) {
>       throw new IOException("The charset " + charsetName + " is not valid.", 
> ex);
>     } catch (UnsupportedCharsetException ex) {
>       throw new IOException("The charset " + charsetName + " is not 
> supported.", ex);
>     }
>   }
> 
>   @Override
>   public void write(Row element) throws IOException {
>     FSDataOutputStream outputStream = getStream();
>     writeRow(element, outputStream);
>   }
> 
>   private void writeRow(Row element, FSDataOutputStream out) throws 
> IOException {
>     int numFields = element.getArity();
> 
>     for (int i = 0; i < numFields; i++) {
>       Object obj = element.getField(i);
>       if (obj != null) {
>         if (i != 0) {
>           out.write(this.fieldDelimiter.getBytes(charset));
>         }
> 
>         if (quoteStrings) {
>           if (obj instanceof String || obj instanceof StringValue) {
>             out.write('"');
>             out.write(obj.toString().getBytes(charset));
>             out.write('"');
>           } else {
>             out.write(obj.toString().getBytes(charset));
>           }
>         } else {
>           out.write(obj.toString().getBytes(charset));
>         }
>       } else {
>         if (this.allowNullValues) {
>           if (i != 0) {
>             out.write(this.fieldDelimiter.getBytes(charset));
>           }
>         } else {
>           throw new RuntimeException("Cannot write tuple with <null> value at 
> position: " + i);
>         }
>       }
>     }
> 
>     // add the record delimiter
>     out.write(this.recordDelimiter.getBytes(charset));
>   }
> 
>   @Override
>   public Writer<Row> duplicate() {
>     return new RowCsvWriter(charsetName, fieldDelimiter, recordDelimiter);
>   }
> }
> 
> 
> 
> Any help is appreciated,
> Flavio

Reply via email to