Re: Questions about UDTF in flink SQL
Hi Rong, Yes, what Jark described is exactly whet I need. Currently we have a work around for this problem, by using a UDF whose result type is a Map. I will took a look on your proposals and PR. Thanks for your help and suggestions. Best, Wangsan > On Dec 1, 2018, at 7:30 AM, Rong Rong wrote: > > Hi Wangsan, > > If your require is essentially wha Jark describe, we already have a proposal > following up [FLINK-9249] in its related/parent task: [FLINK-9484]. We are > already implementing some of these internally and have one PR ready for > review for FLINK-9294. > > Please kindly take a look and see if there's any additional features you > would like to comment and suggest. > > Thanks, > Rong > > On Fri, Nov 30, 2018 at 1:54 AM Jark Wu <mailto:imj...@gmail.com>> wrote: > Hi Wangsan, > > If I understand correctly, you want the return type of UDTF is determined by > the actual arguments, not a fixed result type. For example: > > udtf("int, string, long", inputField)returns a composite type with [f0: > INT, f1: VARCHAR, f2: BIGINT] > udtf("int", inputField)returns an atomic type with [f0: INT] > > This is an interesting and useful feature IMO. But it maybe need some > modification for the current API of TableFunction to > provide an additional `TypeInformation[T] getResultType(Object[] arguments, > Class[] argTypes)` interface. Which means need > more discussion in the community. > > But you can create an issue if this is what you want and we can discuss how > to support it. > > Best, > Jark > > > > On Thu, 29 Nov 2018 at 19:14, Timo Walther <mailto:twal...@apache.org>> wrote: > Hi Wangsan, > > currently, UDFs have very strict result type assumptions. This is > necessary to determine the serializers for the cluster. There were > multiple requests for more flexible handling of types in UDFs. > > Please have a look at: > - [FLINK-7358] Add implicitly converts support for User-defined function > - [FLINK-9294] [table] Improve type inference for UDFs with composite > parameter and/or result type > - [FLINK-10958] [table] Add overload support for user defined function > > I you think those issues do not represent what you need. You can open a > new issue with a little example of what feature you think is missing. > > Regards, > Timo > > > Am 28.11.18 um 09:59 schrieb wangsan: > > Hi all, > > > > When using user-defined table function in Flink SQL, it seems that the > > result type of a table function must be determinstic. > > > > If I want a UDTF whose result type is determined by its input parameters, > > what should I do? > > > > What I want to do is like this: > > > > ``` > > SELECT input, f1, f2 length FROM MyTable, LATERAL TABLE(unnest_udtf(input, > > v1, v2)) as T(f1, f2), LATERAL TABLE(unnest_udtf(input, v3, v4, v5)) as > > T(f3, f4, f5) > > ``` > > > > I can surely register the same UDTF with different name and configuration, > > but I guess that’s not a good idea :(. > > > > If we can not make this in Flink SQL for now , may be we should consider > > this feature in future? > > > > Best, > > wangsan > >
Questions about UDTF in flink SQL
Hi all, When using user-defined table function in Flink SQL, it seems that the result type of a table function must be determinstic. If I want a UDTF whose result type is determined by its input parameters, what should I do? What I want to do is like this: ``` SELECT input, f1, f2 length FROM MyTable, LATERAL TABLE(unnest_udtf(input, v1, v2)) as T(f1, f2), LATERAL TABLE(unnest_udtf(input, v3, v4, v5)) as T(f3, f4, f5) ``` I can surely register the same UDTF with different name and configuration, but I guess that’s not a good idea :(. If we can not make this in Flink SQL for now , may be we should consider this feature in future? Best, wangsan
Side effect of DataStreamRel#translateToPlan
Hi all, I noticed that the DataStreamRel#translateToPlan is non-idempotent, and that may cause the execution plan not as what we expected. Every time we call DataStreamRel#translateToPlan (in TableEnvirnment#explain, TableEnvirnment#writeToSink, etc), we add same operators in execution environment repeatedly. Should we eliminate the side effect of DataStreamRel#translateToPlan ? Best, Wangsan appendix tenv.registerTableSource("test_source", sourceTable) val t = tenv.sqlQuery("SELECT * from test_source") println(tenv.explain(t)) println(tenv.explain(t)) implicit val typeInfo = TypeInformation.of(classOf[Row]) tenv.toAppendStream(t) println(tenv.explain(t)) We call explain three times, and the Physical Execution Plan are all diffrent. == Abstract Syntax Tree == LogicalProject(f1=[$0], f2=[$1]) LogicalTableScan(table=[[test_source]]) == Optimized Logical Plan == StreamTableSourceScan(table=[[test_source]], fields=[f1, f2], source=[CsvTableSource(read fields: f1, f2)]) == Physical Execution Plan == Stage 1 : Data Source content : collect elements with CollectionInputFormat Stage 2 : Operator content : CsvTableSource(read fields: f1, f2) ship_strategy : FORWARD Stage 3 : Operator content : Map ship_strategy : FORWARD == Abstract Syntax Tree == LogicalProject(f1=[$0], f2=[$1]) LogicalTableScan(table=[[test_source]]) == Optimized Logical Plan == StreamTableSourceScan(table=[[test_source]], fields=[f1, f2], source=[CsvTableSource(read fields: f1, f2)]) == Physical Execution Plan == Stage 1 : Data Source content : collect elements with CollectionInputFormat Stage 2 : Operator content : CsvTableSource(read fields: f1, f2) ship_strategy : FORWARD Stage 3 : Operator content : Map ship_strategy : FORWARD Stage 4 : Data Source content : collect elements with CollectionInputFormat Stage 5 : Operator content : CsvTableSource(read fields: f1, f2) ship_strategy : FORWARD Stage 6 : Operator content : Map ship_strategy : FORWARD == Abstract Syntax Tree == LogicalProject(f1=[$0], f2=[$1]) LogicalTableScan(table=[[test_source]]) == Optimized Logical Plan == StreamTableSourceScan(table=[[test_source]], fields=[f1, f2], source=[CsvTableSource(read fields: f1, f2)]) == Physical Execution Plan == Stage 1 : Data Source content : collect elements with CollectionInputFormat Stage 2 : Operator content : CsvTableSource(read fields: f1, f2) ship_strategy : FORWARD Stage 3 : Operator content : Map ship_strategy : FORWARD Stage 4 : Data Source content : collect elements with CollectionInputFormat Stage 5 : Operator content : CsvTableSource(read fields: f1, f2) ship_strategy : FORWARD Stage 6 : Operator content : Map ship_strategy : FORWARD Stage 7 : Data Source content : collect elements with CollectionInputFormat Stage 8 : Operator content : CsvTableSource(read fields: f1, f2) ship_strategy : FORWARD Stage 9 : Operator content : Map ship_strategy : FORWARD Stage 10 : Operator content : to: Row ship_strategy : FORWARD Stage 11 : Data Source content : collect elements with CollectionInputFormat Stage 12 : Operator content : CsvTableSource(read fields: f1, f2) ship_strategy : FORWARD Stage 13 : Operator content : Map ship_strategy : FORWARD
Re: Confusions About JDBCOutputFormat
Well, I see. If the connection is established when writing data into DB, we need to cache received rows since last write. IMO, maybe we do not need to open connections repeatedly or introduce connection pools. Test and refresh the connection periodically can simply solve this problem. I’ve implemented this at https://github.com/apache/flink/pull/6301 <https://github.com/apache/flink/pull/6301>, It would be kind of you to review this. Best, wangsan > On Jul 11, 2018, at 2:25 PM, Hequn Cheng wrote: > > Hi wangsan, > > What I mean is establishing a connection each time write data into JDBC, > i.e. establish a connection in flush() function. I think this will make > sure the connection is ok. What do you think? > > On Wed, Jul 11, 2018 at 12:12 AM, wangsan <mailto:wamg...@163.com>> wrote: > >> Hi Hequn, >> >> Establishing a connection for each batch write may also have idle >> connection problem, since we are not sure when the connection will be >> closed. We call flush() method when a batch is finished or snapshot state, >> but what if the snapshot is not enabled and the batch size not reached >> before the connection is closed? >> >> May be we could use a Timer to test the connection periodically and keep >> it alive. What do you think? >> >> I will open a jira and try to work on that issue. >> >> Best, >> wangsan >> >> >> >> On Jul 10, 2018, at 8:38 PM, Hequn Cheng wrote: >> >> Hi wangsan, >> >> I agree with you. It would be kind of you to open a jira to check the >> problem. >> >> For the first problem, I think we need to establish connection each time >> execute batch write. And, it is better to get the connection from a >> connection pool. >> For the second problem, to avoid multithread problem, I think we should >> synchronized the batch object in flush() method. >> >> What do you think? >> >> Best, Hequn >> >> >> >> On Tue, Jul 10, 2018 at 2:36 PM, wangsan > <mailto:wamg...@163.com>> wrote: >> >>> Hi all, >>> >>> I'm going to use JDBCAppendTableSink and JDBCOutputFormat in my Flink >>> application. But I am confused with the implementation of JDBCOutputFormat. >>> >>> 1. The Connection was established when JDBCOutputFormat is opened, and >>> will be used all the time. But if this connction lies idle for a long time, >>> the database will force close the connetion, thus errors may occur. >>> 2. The flush() method is called when batchCount exceeds the threshold, >>> but it is also called while snapshotting state. So two threads may modify >>> upload and batchCount, but without synchronization. >>> >>> Please correct me if I am wrong. >>> >>> —— >>> wangsan
Re: Confusions About JDBCOutputFormat
Hi Hequn, Establishing a connection for each batch write may also have idle connection problem, since we are not sure when the connection will be closed. We call flush() method when a batch is finished or snapshot state, but what if the snapshot is not enabled and the batch size not reached before the connection is closed? May be we could use a Timer to test the connection periodically and keep it alive. What do you think? I will open a jira and try to work on that issue. Best, wangsan > On Jul 10, 2018, at 8:38 PM, Hequn Cheng wrote: > > Hi wangsan, > > I agree with you. It would be kind of you to open a jira to check the problem. > > For the first problem, I think we need to establish connection each time > execute batch write. And, it is better to get the connection from a > connection pool. > For the second problem, to avoid multithread problem, I think we should > synchronized the batch object in flush() method. > > What do you think? > > Best, Hequn > > > > On Tue, Jul 10, 2018 at 2:36 PM, wangsan <mailto:wamg...@163.com>> wrote: > Hi all, > > I'm going to use JDBCAppendTableSink and JDBCOutputFormat in my Flink > application. But I am confused with the implementation of JDBCOutputFormat. > > 1. The Connection was established when JDBCOutputFormat is opened, and will > be used all the time. But if this connction lies idle for a long time, the > database will force close the connetion, thus errors may occur. > 2. The flush() method is called when batchCount exceeds the threshold, but it > is also called while snapshotting state. So two threads may modify upload and > batchCount, but without synchronization. > > Please correct me if I am wrong. > > —— > wangsan >
Confusions About JDBCOutputFormat
Hi all, I'm going to use JDBCAppendTableSink and JDBCOutputFormat in my Flink application. But I am confused with the implementation of JDBCOutputFormat. 1. The Connection was established when JDBCOutputFormat is opened, and will be used all the time. But if this connction lies idle for a long time, the database will force close the connetion, thus errors may occur. 2. The flush() method is called when batchCount exceeds the threshold, but it is also called while snapshotting state. So two threads may modify upload and batchCount, but without synchronization. Please correct me if I am wrong. —— wangsan
Re: Question about Timestamp in Flink SQL
Hi Timo, What I am doing is extracting a timestamp field (may be string format as “2017-11-28 11:00:00” or a long value base on my current timezone) as Event time attribute. So In timestampAndWatermarkAssigner , for string format I should parse the data time string using GMT, and for long value I should add the offset as opposite to what internalToTimestamp did. But the Processing time attribute can not keep consistent. Am I understanding that correctly? Best, wangsan > On 29 Nov 2017, at 4:43 PM, Timo Walther <twal...@apache.org> wrote: > > Hi Wangsan, > > currently the timestamps in Flink SQL do not depend on a timezone. All > calculations happen on the UTC timestamp. This also guarantees that an input > with Timestamp.valueOf("XXX") remains consistent when parsing and outputing > it with toString(). > > Regards, > Timo > > > Am 11/29/17 um 3:43 AM schrieb wangsan: >> Hi Xincan, >> >> Thanks for your reply. >> >> The system default timezone is just as what I expected >> (sun.util.calendar.ZoneInfo[id="Asia/Shanghai",offset=2880,dstSavings=0,useDaylight=false,transitions=19,lastRule=null]). >> >> I looked into the generated code, and I found the following code snippet: >> >> ``` >> result$20 = >> org.apache.calcite.runtime.SqlFunctions.internalToTimestamp(result$19); >> ``` >> >> And what `internalToTimestamp` function did is: >> >> ``` >> public static Timestamp internalToTimestamp(long v) { >> return new Timestamp(v - (long)LOCAL_TZ.getOffset(v)); >> } >> ``` >> >> So, if I give it an event time with unix timestamp 0, then I got the >> Timestamp(-2880). I am confused why `internalToTimestamp` need to >> subtract the offset? >> >> Best, >> wangsan >> >> >>> On 28 Nov 2017, at 11:32 PM, Xingcan Cui <xingc...@gmail.com >>> <mailto:xingc...@gmail.com>> wrote: >>> >>> Hi wangsan, >>> >>> in Flink, the ProcessingTime is just implemented by invoking >>> System.currentTimeMillis() and the long value will be automatically wrapped >>> to a Timestamp with the following statement: >>> >>> `new java.sql.Timestamp(time - TimeZone.getDefault().getOffset(time));` >>> >>> You can check your TimeZone.getDefault() to see if it returns the right >>> TimeZone. Generally, the returned value should rely on the default TimeZone >>> of your operating system. >>> >>> Hope that helps. >>> >>> Best, >>> Xingcan >>> >>> On Tue, Nov 28, 2017 at 9:31 PM, wangsan <wamg...@163.com >>> <mailto:wamg...@163.com>> wrote: >>> Hi all, >>> >>> While using Timestamp in Flint SQL, how can I set timezone info? Since my >>> current timezone is GMT+8, and I found the selected processing time is >>> always 8 hours late than current time. So as extracted event time. >>> >>> Here’s my simplified code: >>> val senv = StreamExecutionEnvironment.getExecutionEnvironment >>> senv.setParallelism(1) >>> senv.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime) >>> >>> val sTableEnv = TableEnvironment.getTableEnvironment(senv) >>> println(s"current time: ${new SimpleDateFormat(".MM.dd HH:mm:ss.SSS", >>> Locale.CHINA).format(new Date())}") >>> >>> val stream: DataStream[(String, String, String)] = >>> senv.socketTextStream("localhost", ).map(line => (line, line, line)) >>> val table = sTableEnv.fromDataStream(stream, 'col1, 'col2, 'col3, >>> 't.proctime) >>> sTableEnv.registerTable("foo", table) >>> val result = sTableEnv.sql("select * from foo") >>> result.printSchema() >>> result.toAppendStream[Row].print() >>> >>> senv.execute("foo") >>> And here’s the result: >>> >>> >>> >>> Best, >>> wangsan >>> >> >
Re: Question about Timestamp in Flink SQL
Hi Xincan, Thanks for your reply. The system default timezone is just as what I expected (sun.util.calendar.ZoneInfo[id="Asia/Shanghai",offset=2880,dstSavings=0,useDaylight=false,transitions=19,lastRule=null]). I looked into the generated code, and I found the following code snippet: ``` result$20 = org.apache.calcite.runtime.SqlFunctions.internalToTimestamp(result$19); ``` And what `internalToTimestamp` function did is: ``` public static Timestamp internalToTimestamp(long v) { return new Timestamp(v - (long)LOCAL_TZ.getOffset(v)); } ``` So, if I give it an event time with unix timestamp 0, then I got the Timestamp(-2880). I am confused why `internalToTimestamp` need to subtract the offset? Best, wangsan > On 28 Nov 2017, at 11:32 PM, Xingcan Cui <xingc...@gmail.com> wrote: > > Hi wangsan, > > in Flink, the ProcessingTime is just implemented by invoking > System.currentTimeMillis() and the long value will be automatically wrapped > to a Timestamp with the following statement: > > `new java.sql.Timestamp(time - TimeZone.getDefault().getOffset(time));` > > You can check your TimeZone.getDefault() to see if it returns the right > TimeZone. Generally, the returned value should rely on the default TimeZone > of your operating system. > > Hope that helps. > > Best, > Xingcan > > On Tue, Nov 28, 2017 at 9:31 PM, wangsan <wamg...@163.com > <mailto:wamg...@163.com>> wrote: > Hi all, > > While using Timestamp in Flint SQL, how can I set timezone info? Since my > current timezone is GMT+8, and I found the selected processing time is always > 8 hours late than current time. So as extracted event time. > > Here’s my simplified code: > val senv = StreamExecutionEnvironment.getExecutionEnvironment > senv.setParallelism(1) > senv.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime) > > val sTableEnv = TableEnvironment.getTableEnvironment(senv) > println(s"current time: ${new SimpleDateFormat(".MM.dd HH:mm:ss.SSS", > Locale.CHINA).format(new Date())}") > > val stream: DataStream[(String, String, String)] = > senv.socketTextStream("localhost", ).map(line => (line, line, line)) > val table = sTableEnv.fromDataStream(stream, 'col1, 'col2, 'col3, 't.proctime) > sTableEnv.registerTable("foo", table) > val result = sTableEnv.sql("select * from foo") > result.printSchema() > result.toAppendStream[Row].print() > > senv.execute("foo") > And here’s the result: > > > > Best, > wangsan >
Re: Hive integration in table API and SQL
Hi Timo, Thanks for your reply. I do notice that the document says "A Table is always bound to a specific TableEnvironment. It is not possible to combine tables of different TableEnvironments in the same query, e.g., to join or union them.” Does that mean there is no way I can make operations, like join, on a streaming table and a batch table ? Best, wangsan > On 20 Nov 2017, at 9:15 PM, Timo Walther <twal...@apache.org> wrote: > > Timo
Hive integration in table API and SQL
Hi all, I am currently learning table API and SQL in Flink. I noticed that Flink does not support Hive tables as table source, and even JDBC table source are not provided. There are cases we do need to join a stream table with static Hive or other database tables to get more specific attributes, so how can I implements this functionality. Do I need to implement my own dataset connectors to load data from external tables using JDBC and register the dataset as table, or should I provide an external catalog? Thanks, wangsan
Re:Re: Exception in BucketingSink when cancelling Flink job
Hi, 'Join' method can be call with a timeout (as is called in TaskCanceler), so it won't be block forever if the respective thread is in deadlock state. Maybe calling 'interrupt()' after 'join(timeout)' is more reasonable, altought it still can not make sure operations inside 'close()' method is finished. Best, wangsan 在2017年09月29 01时52分, "Stephan Ewen"<step...@data-artisans.com>写道: Hi! Calling 'interrupt()' makes only sense before 'join()', because 'join()' blocks until the respective thread is finished. The 'interrupt()' call happens to cancel the task out of potentially blocking I/O or sleep/wait operations. The problem is that HDFS does not handle interrupts correctly, it sometimes deadlocks in the case of interrupts on unclosed streams :-( I think it would be important to make sure (in the Bucketing Sink) that the DFS streams are closed upon task cancellation. @aljoscha - adding you to this thread, as you know most about the bucketing sink. Best, Stephan On Wed, Sep 27, 2017 at 10:18 AM, Stefan Richter <s.rich...@data-artisans.com> wrote: Hi, I would speculate that the reason for this order is that we want to shutdown the tasks quickly by interrupting blocking calls in the event of failure, so that recover can begin as fast as possible. I am looping in Stephan who might give more details about this code. Best, Stefan Am 27.09.2017 um 07:33 schrieb wangsan <wamg...@163.com>: After digging into the source code, we found that when Flink job is canceled, a TaskCanceler thread is created. The TaskCanceler thread calls cancel() on the invokable and periodically interrupts the task thread until it has terminated. try { invokable.cancel(); } catch (Throwable t) { logger.error("Error while canceling the task {}.", taskName, t); }//..executer.interrupt();try { executer.join(interruptInterval); }catch (InterruptedException e) { // we can ignore this}//.. Notice that TaskCanceler first send interrupt signal to task thread, and following with join method. And since the task thread is now try to close DFSOutputStream, which is waiting for ack, thus InterruptedException is throwed out in task thread. synchronized (dataQueue) {while (!streamerClosed) { checkClosed(); if (lastAckedSeqno >= seqno) {break; } try { dataQueue.wait(1000); // when we receive an ack, we notify on // dataQueue } catch (InterruptedException ie) {thrownewInterruptedIOException( "Interrupted while waiting for data to be acknowledged by pipeline"); } } I was confused why TaskCanceler call executer.interrupt() before executer.join(interruptInterval). Can anyone help? Hi, We are currently using BucketingSink to save data into HDFS in parquet format. But when the flink job was cancelled, we always got Exception in BucketingSink's close method. The datailed exception info is as below: [ERROR] [2017-09-26 20:51:58,893] [org.apache.flink.streaming.runtime.tasks.StreamTask] - Error during disposal of stream operator. java.io.InterruptedIOException: Interrupted while waiting for data to be acknowledged by pipeline at org.apache.hadoop.hdfs.DFSOutputStream.waitForAckedSeqno(DFSOutputStream.java:2151) at org.apache.hadoop.hdfs.DFSOutputStream.flushInternal(DFSOutputStream.java:2130) at org.apache.hadoop.hdfs.DFSOutputStream.closeImpl(DFSOutputStream.java:2266) at org.apache.hadoop.hdfs.DFSOutputStream.close(DFSOutputStream.java:2236) at org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:72) at org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:106) at org.apache.parquet.hadoop.ParquetFileWriter.end(ParquetFileWriter.java:643) at org.apache.parquet.hadoop.InternalParquetRecordWriter.close(InternalParquetRecordWriter.java:117) at org.apache.parquet.hadoop.ParquetWriter.close(ParquetWriter.java:301) ... at org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:43) at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:126) at org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:429) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:334) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702) at java.lang.Thread.run(Thread.java:745) It seems that DFSOutputStream haven't been closed before task thread is force terminated. We found a similar problem in http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Changing-timeout-for-cancel-command-td12601.html , but setting "akka.ask.timeout" to a larger value does not work for us. So how can we make sure the stream is safely closed when cacelling a job? Best, wangsan
Exception in BucketingSink when cancelling Flink job
Hi, We are currently using BucketingSink to save data into HDFS in parquet format. But when the flink job was cancelled, we always got Exception in BucketingSink's close method. The datailed exception info is as below: [ERROR] [2017-09-26 20:51:58,893] [org.apache.flink.streaming.runtime.tasks.StreamTask] - Error during disposal of stream operator. java.io.InterruptedIOException: Interrupted while waiting for data to be acknowledged by pipeline at org.apache.hadoop.hdfs.DFSOutputStream.waitForAckedSeqno(DFSOutputStream.java:2151) at org.apache.hadoop.hdfs.DFSOutputStream.flushInternal(DFSOutputStream.java:2130) at org.apache.hadoop.hdfs.DFSOutputStream.closeImpl(DFSOutputStream.java:2266) at org.apache.hadoop.hdfs.DFSOutputStream.close(DFSOutputStream.java:2236) at org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:72) at org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:106) at org.apache.parquet.hadoop.ParquetFileWriter.end(ParquetFileWriter.java:643) at org.apache.parquet.hadoop.InternalParquetRecordWriter.close(InternalParquetRecordWriter.java:117) at org.apache.parquet.hadoop.ParquetWriter.close(ParquetWriter.java:301) ... at org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:43) at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:126) at org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:429) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:334) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702) at java.lang.Thread.run(Thread.java:745) It seems that DFSOutputStream haven't been closed before task thread is force terminated. We found a similar problem in http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Changing-timeout-for-cancel-command-td12601.html , but setting "akka.ask.timeout" to a larger value does not work for us. So how can we make sure the stream is safely closed when cacelling a job? Best, wangsan