Hi Flavio, If I understand correctly, I think you bumped into this issue: https://issues.apache.org/jira/browse/FLINK-2646 <https://issues.apache.org/jira/browse/FLINK-2646>
There is also a similar discussion on the BucketingSink here: http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Adding-a-dispose-method-in-the-RichFunction-td14466.html#a14468 <http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Adding-a-dispose-method-in-the-RichFunction-td14466.html#a14468> Kostas > On Sep 8, 2017, at 4:27 PM, Flavio Pompermaier <pomperma...@okkam.it> wrote: > > Hi to all, > I'm trying to test a streaming job but the files written by the BucketingSink > are never finalized (remains into the pending state). > Is this caused by the fact that the job finishes before the checkpoint? > Shouldn't the sink properly close anyway? > > This is my code: > > @Test > public void testBucketingSink() throws Exception { > final StreamExecutionEnvironment senv = > StreamExecutionEnvironment.getExecutionEnvironment(); > final StreamTableEnvironment tEnv = > TableEnvironment.getTableEnvironment(senv); > senv.enableCheckpointing(5000); > DataStream<String> testStream = senv.fromElements(// > "1,aaa,white", // > "2,bbb,gray", // > "3,ccc,white", // > "4,bbb,gray", // > "5,bbb,gray" // > ); > final RowTypeInfo rtf = new RowTypeInfo( > BasicTypeInfo.STRING_TYPE_INFO, > BasicTypeInfo.STRING_TYPE_INFO, > BasicTypeInfo.STRING_TYPE_INFO); > DataStream<Row> rows = testStream.map(new MapFunction<String, Row>() { > > private static final long serialVersionUID = 1L; > > @Override > public Row map(String str) throws Exception { > String[] split = str.split(Pattern.quote(",")); > Row ret = new Row(3); > ret.setField(0, split[0]); > ret.setField(1, split[1]); > ret.setField(2, split[2]); > return ret; > } > }).returns(rtf); > > String columnNames = "id,value,state"; > final String dsName = "test"; > tEnv.registerDataStream(dsName, rows, columnNames); > final String whiteAreaFilter = "state = 'white'"; > DataStream<Row> grayArea = rows; > DataStream<Row> whiteArea = null; > if (whiteAreaFilter != null) { > String sql = "SELECT *, (%s) as _WHITE FROM %s"; > sql = String.format(sql, whiteAreaFilter, dsName); > Table table = tEnv.sql(sql); > grayArea = > tEnv.toDataStream(table.where("!_WHITE").select(columnNames), rtf); > DataStream<Row> nw = > tEnv.toDataStream(table.where("_WHITE").select(columnNames), rtf); > whiteArea = whiteArea == null ? nw : whiteArea.union(nw); > } > Writer<Row> bucketSinkwriter = new RowCsvWriter("UTF-8", "\t", "\n"); > > String datasetWhiteDir = "/tmp/bucket/white"; > BucketingSink<Row> whiteAreaSink = new > BucketingSink<>(datasetWhiteDir.toString()); > whiteAreaSink.setWriter(bucketSinkwriter); > whiteAreaSink.setBatchSize(10); > whiteArea.addSink(whiteAreaSink); > > String datasetGrayDir = "/tmp/bucket/gray"; > BucketingSink<Row> grayAreaSink = new > BucketingSink<>(datasetGrayDir.toString()); > grayAreaSink.setWriter(bucketSinkwriter); > grayAreaSink.setBatchSize(10); > grayArea.addSink(grayAreaSink); > > JobExecutionResult jobInfo = senv.execute("Buketing sink test "); > System.out.printf("Job took %s minutes", > jobInfo.getNetRuntime(TimeUnit.MINUTES)); > } > > > > > > > > public class RowCsvWriter extends StreamWriterBase<Row> { > private static final long serialVersionUID = 1L; > > private final String charsetName; > private transient Charset charset; > private String fieldDelimiter; > private String recordDelimiter; > private boolean allowNullValues = true; > private boolean quoteStrings = false; > > /** > * Creates a new {@code StringWriter} that uses {@code "UTF-8"} charset to > convert strings to > * bytes. > */ > public RowCsvWriter() { > this("UTF-8", CsvOutputFormat.DEFAULT_FIELD_DELIMITER, > CsvOutputFormat.DEFAULT_LINE_DELIMITER); > } > > /** > * Creates a new {@code StringWriter} that uses the given charset to > convert strings to bytes. > * > * @param charsetName Name of the charset to be used, must be valid input > for > * {@code Charset.forName(charsetName)} > */ > public RowCsvWriter(String charsetName, String fieldDelimiter, String > recordDelimiter) { > this.charsetName = charsetName; > this.fieldDelimiter = fieldDelimiter; > this.recordDelimiter = recordDelimiter; > } > > @Override > public void open(FileSystem fs, Path path) throws IOException { > super.open(fs, path); > > try { > this.charset = Charset.forName(charsetName); > } catch (IllegalCharsetNameException ex) { > throw new IOException("The charset " + charsetName + " is not valid.", > ex); > } catch (UnsupportedCharsetException ex) { > throw new IOException("The charset " + charsetName + " is not > supported.", ex); > } > } > > @Override > public void write(Row element) throws IOException { > FSDataOutputStream outputStream = getStream(); > writeRow(element, outputStream); > } > > private void writeRow(Row element, FSDataOutputStream out) throws > IOException { > int numFields = element.getArity(); > > for (int i = 0; i < numFields; i++) { > Object obj = element.getField(i); > if (obj != null) { > if (i != 0) { > out.write(this.fieldDelimiter.getBytes(charset)); > } > > if (quoteStrings) { > if (obj instanceof String || obj instanceof StringValue) { > out.write('"'); > out.write(obj.toString().getBytes(charset)); > out.write('"'); > } else { > out.write(obj.toString().getBytes(charset)); > } > } else { > out.write(obj.toString().getBytes(charset)); > } > } else { > if (this.allowNullValues) { > if (i != 0) { > out.write(this.fieldDelimiter.getBytes(charset)); > } > } else { > throw new RuntimeException("Cannot write tuple with <null> value at > position: " + i); > } > } > } > > // add the record delimiter > out.write(this.recordDelimiter.getBytes(charset)); > } > > @Override > public Writer<Row> duplicate() { > return new RowCsvWriter(charsetName, fieldDelimiter, recordDelimiter); > } > } > > > > Any help is appreciated, > Flavio