Re: Questions about UDTF in flink SQL

2018-11-30 Thread wangsan
Hi Rong,

Yes, what Jark described is exactly whet I need. Currently we have a work 
around for this problem, by using a UDF whose result type is a Map. I will took 
a look on your proposals and PR. 

Thanks for your help and suggestions.

Best,
Wangsan


> On Dec 1, 2018, at 7:30 AM, Rong Rong  wrote:
> 
> Hi Wangsan,
> 
> If your require is essentially wha Jark describe, we already have a proposal 
> following up [FLINK-9249] in its related/parent task: [FLINK-9484]. We are 
> already implementing some of these internally and have one PR ready for 
> review for FLINK-9294.
> 
> Please kindly take a look and see if there's any additional features you 
> would like to comment and suggest.
> 
> Thanks,
> Rong
> 
> On Fri, Nov 30, 2018 at 1:54 AM Jark Wu  <mailto:imj...@gmail.com>> wrote:
> Hi Wangsan,
> 
> If I understand correctly, you want the return type of UDTF is determined by 
> the actual arguments, not a fixed result type. For example:
> 
> udtf("int, string, long", inputField)returns  a composite type with [f0: 
> INT, f1: VARCHAR, f2: BIGINT]
> udtf("int", inputField)returns  an atomic type with [f0: INT]
> 
> This is an interesting and useful feature IMO. But it maybe need some 
> modification for the current API of TableFunction to
> provide an additional `TypeInformation[T] getResultType(Object[] arguments, 
> Class[] argTypes)` interface. Which means need 
> more discussion in the community.
> 
> But you can create an issue if this is what you want and we can discuss how 
> to support it.
> 
> Best,
> Jark
> 
> 
> 
> On Thu, 29 Nov 2018 at 19:14, Timo Walther  <mailto:twal...@apache.org>> wrote:
> Hi Wangsan,
> 
> currently, UDFs have very strict result type assumptions. This is 
> necessary to determine the serializers for the cluster. There were 
> multiple requests for more flexible handling of types in UDFs.
> 
> Please have a look at:
> - [FLINK-7358] Add implicitly converts support for User-defined function
> - [FLINK-9294] [table] Improve type inference for UDFs with composite 
> parameter and/or result type
> - [FLINK-10958] [table] Add overload support for user defined function
> 
> I you think those issues do not represent what you need. You can open a 
> new issue with a little example of what feature you think is missing.
> 
> Regards,
> Timo
> 
> 
> Am 28.11.18 um 09:59 schrieb wangsan:
> > Hi all,
> >
> > When using user-defined table function in Flink SQL, it seems that the 
> > result type of a table function must be determinstic.
> >
> > If I want a UDTF whose result type is determined by its input parameters, 
> > what should I do?
> >
> > What I want to do is like this:
> >
> > ```
> > SELECT input, f1, f2 length FROM MyTable, LATERAL TABLE(unnest_udtf(input, 
> > v1, v2)) as T(f1, f2), LATERAL TABLE(unnest_udtf(input, v3, v4, v5)) as 
> > T(f3, f4, f5)
> > ```
> >
> > I can surely register the same UDTF with different name and configuration, 
> > but I guess that’s not a good idea :(.
> >
> > If we can not make this in Flink SQL for now , may be we should consider 
> > this feature in future?
> >
> > Best,
> > wangsan
> 
> 



Questions about UDTF in flink SQL

2018-11-28 Thread wangsan
Hi all,

When using user-defined table function in Flink SQL, it seems that the result 
type of a table function must be determinstic. 

If I want a UDTF whose result type is determined by its input parameters, what 
should I do?

What I want to do is like this:

```
SELECT input, f1, f2 length FROM MyTable, LATERAL TABLE(unnest_udtf(input, v1, 
v2)) as T(f1, f2), LATERAL TABLE(unnest_udtf(input, v3, v4, v5)) as T(f3, f4, 
f5)
```

I can surely register the same UDTF with different name and configuration, but 
I guess that’s not a good idea :(. 

If we can not make this in Flink SQL for now , may be we should consider this 
feature in future?

Best,
wangsan


Side effect of DataStreamRel#translateToPlan

2018-08-21 Thread wangsan
Hi all,

I noticed that the DataStreamRel#translateToPlan is non-idempotent, and that 
may cause the execution plan not as what we expected. Every time we call 
DataStreamRel#translateToPlan (in TableEnvirnment#explain, 
TableEnvirnment#writeToSink, etc), we add same operators in execution 
environment repeatedly. 

Should we eliminate the side effect of DataStreamRel#translateToPlan ? 

Best,  Wangsan

appendix

tenv.registerTableSource("test_source", sourceTable)

val t = tenv.sqlQuery("SELECT * from test_source")
println(tenv.explain(t))
println(tenv.explain(t))

implicit val typeInfo = TypeInformation.of(classOf[Row])
tenv.toAppendStream(t)
println(tenv.explain(t))
We call explain three times, and the Physical Execution Plan are all diffrent.

== Abstract Syntax Tree ==
LogicalProject(f1=[$0], f2=[$1])
  LogicalTableScan(table=[[test_source]])

== Optimized Logical Plan ==
StreamTableSourceScan(table=[[test_source]], fields=[f1, f2], 
source=[CsvTableSource(read fields: f1, f2)])

== Physical Execution Plan ==
Stage 1 : Data Source
content : collect elements with CollectionInputFormat

Stage 2 : Operator
content : CsvTableSource(read fields: f1, f2)
ship_strategy : FORWARD

Stage 3 : Operator
content : Map
ship_strategy : FORWARD


== Abstract Syntax Tree ==
LogicalProject(f1=[$0], f2=[$1])
  LogicalTableScan(table=[[test_source]])

== Optimized Logical Plan ==
StreamTableSourceScan(table=[[test_source]], fields=[f1, f2], 
source=[CsvTableSource(read fields: f1, f2)])

== Physical Execution Plan ==
Stage 1 : Data Source
content : collect elements with CollectionInputFormat

Stage 2 : Operator
content : CsvTableSource(read fields: f1, f2)
ship_strategy : FORWARD

Stage 3 : Operator
content : Map
ship_strategy : FORWARD

Stage 4 : Data Source
content : collect elements with CollectionInputFormat

Stage 5 : Operator
content : CsvTableSource(read fields: f1, f2)
ship_strategy : FORWARD

Stage 6 : Operator
content : Map
ship_strategy : FORWARD


== Abstract Syntax Tree ==
LogicalProject(f1=[$0], f2=[$1])
  LogicalTableScan(table=[[test_source]])

== Optimized Logical Plan ==
StreamTableSourceScan(table=[[test_source]], fields=[f1, f2], 
source=[CsvTableSource(read fields: f1, f2)])

== Physical Execution Plan ==
Stage 1 : Data Source
content : collect elements with CollectionInputFormat

Stage 2 : Operator
content : CsvTableSource(read fields: f1, f2)
ship_strategy : FORWARD

Stage 3 : Operator
content : Map
ship_strategy : FORWARD

Stage 4 : Data Source
content : collect elements with CollectionInputFormat

Stage 5 : Operator
content : CsvTableSource(read fields: f1, f2)
ship_strategy : FORWARD

Stage 6 : Operator
content : Map
ship_strategy : FORWARD

Stage 7 : Data Source
content : collect elements with CollectionInputFormat

Stage 8 : Operator
content : CsvTableSource(read fields: f1, f2)
ship_strategy : FORWARD

Stage 9 : Operator
content : Map
ship_strategy : FORWARD

Stage 10 : Operator
content : to: Row
ship_strategy : FORWARD

Stage 11 : Data Source
content : collect elements with CollectionInputFormat

Stage 12 : Operator
content : CsvTableSource(read fields: f1, f2)
ship_strategy : FORWARD

Stage 13 : Operator
content : Map
ship_strategy : FORWARD



Re: Confusions About JDBCOutputFormat

2018-07-11 Thread wangsan
Well, I see. If the connection is established when writing data into DB, we 
need to cache received rows since last write. 

IMO, maybe we do not need to open connections repeatedly or introduce 
connection pools. Test and refresh the connection periodically can simply solve 
this problem. I’ve implemented this at 
https://github.com/apache/flink/pull/6301 
<https://github.com/apache/flink/pull/6301>, It would be kind of you to review 
this.

Best,
wangsan


> On Jul 11, 2018, at 2:25 PM, Hequn Cheng  wrote:
> 
> Hi wangsan,
> 
> What I mean is establishing a connection each time write data into JDBC,
> i.e.  establish a connection in flush() function. I think this will make
> sure the connection is ok. What do you think?
> 
> On Wed, Jul 11, 2018 at 12:12 AM, wangsan  <mailto:wamg...@163.com>> wrote:
> 
>> Hi Hequn,
>> 
>> Establishing a connection for each batch write may also have idle
>> connection problem, since we are not sure when the connection will be
>> closed. We call flush() method when a batch is finished or  snapshot state,
>> but what if the snapshot is not enabled and the batch size not reached
>> before the connection is closed?
>> 
>> May be we could use a Timer to test the connection periodically and keep
>> it alive. What do you think?
>> 
>> I will open a jira and try to work on that issue.
>> 
>> Best,
>> wangsan
>> 
>> 
>> 
>> On Jul 10, 2018, at 8:38 PM, Hequn Cheng  wrote:
>> 
>> Hi wangsan,
>> 
>> I agree with you. It would be kind of you to open a jira to check the
>> problem.
>> 
>> For the first problem, I think we need to establish connection each time
>> execute batch write. And, it is better to get the connection from a
>> connection pool.
>> For the second problem, to avoid multithread problem, I think we should
>> synchronized the batch object in flush() method.
>> 
>> What do you think?
>> 
>> Best, Hequn
>> 
>> 
>> 
>> On Tue, Jul 10, 2018 at 2:36 PM, wangsan > <mailto:wamg...@163.com>> wrote:
>> 
>>> Hi all,
>>> 
>>> I'm going to use JDBCAppendTableSink and JDBCOutputFormat in my Flink
>>> application. But I am confused with the implementation of JDBCOutputFormat.
>>> 
>>> 1. The Connection was established when JDBCOutputFormat is opened, and
>>> will be used all the time. But if this connction lies idle for a long time,
>>> the database will force close the connetion, thus errors may occur.
>>> 2. The flush() method is called when batchCount exceeds the threshold,
>>> but it is also called while snapshotting state. So two threads may modify
>>> upload and batchCount, but without synchronization.
>>> 
>>> Please correct me if I am wrong.
>>> 
>>> ——
>>> wangsan



Re: Confusions About JDBCOutputFormat

2018-07-10 Thread wangsan
Hi Hequn,

Establishing a connection for each batch write may also have idle connection 
problem, since we are not sure when the connection will be closed. We call 
flush() method when a batch is finished or  snapshot state, but what if the 
snapshot is not enabled and the batch size not reached before the connection is 
closed?

May be we could use a Timer to test the connection periodically and keep it 
alive. What do you think?

I will open a jira and try to work on that issue.

Best, 
wangsan



> On Jul 10, 2018, at 8:38 PM, Hequn Cheng  wrote:
> 
> Hi wangsan,
> 
> I agree with you. It would be kind of you to open a jira to check the problem.
> 
> For the first problem, I think we need to establish connection each time 
> execute batch write. And, it is better to get the connection from a 
> connection pool.
> For the second problem, to avoid multithread problem, I think we should 
> synchronized the batch object in flush() method.
> 
> What do you think?
> 
> Best, Hequn
> 
> 
> 
> On Tue, Jul 10, 2018 at 2:36 PM, wangsan  <mailto:wamg...@163.com>> wrote:
> Hi all,
> 
> I'm going to use JDBCAppendTableSink and JDBCOutputFormat in my Flink 
> application. But I am confused with the implementation of JDBCOutputFormat.
> 
> 1. The Connection was established when JDBCOutputFormat is opened, and will 
> be used all the time. But if this connction lies idle for a long time, the 
> database will force close the connetion, thus errors may occur.
> 2. The flush() method is called when batchCount exceeds the threshold, but it 
> is also called while snapshotting state. So two threads may modify upload and 
> batchCount, but without synchronization.
> 
> Please correct me if I am wrong.
> 
> ——
> wangsan
> 



Confusions About JDBCOutputFormat

2018-07-10 Thread wangsan
Hi all,

I'm going to use JDBCAppendTableSink and JDBCOutputFormat in my Flink 
application. But I am confused with the implementation of JDBCOutputFormat.

1. The Connection was established when JDBCOutputFormat is opened, and will be 
used all the time. But if this connction lies idle for a long time, the 
database will force close the connetion, thus errors may occur.
2. The flush() method is called when batchCount exceeds the threshold, but it 
is also called while snapshotting state. So two threads may modify upload and 
batchCount, but without synchronization.

Please correct me if I am wrong.

——
wangsan


Re: Question about Timestamp in Flink SQL

2017-11-29 Thread wangsan
Hi Timo,

What I am doing is extracting a timestamp field (may be string format as 
“2017-11-28 11:00:00” or a long value base on my current timezone) as Event 
time attribute. So In timestampAndWatermarkAssigner , for string format I 
should parse the data time string using GMT, and for long value I should add 
the offset as opposite to what internalToTimestamp did. But the Processing time 
attribute can not keep consistent. Am I understanding that correctly?

Best,
wangsan



> On 29 Nov 2017, at 4:43 PM, Timo Walther <twal...@apache.org> wrote:
> 
> Hi Wangsan,
> 
> currently the timestamps in Flink SQL do not depend on a timezone. All 
> calculations happen on the UTC timestamp. This also guarantees that an input 
> with Timestamp.valueOf("XXX") remains consistent when parsing and outputing 
> it with toString().
> 
> Regards,
> Timo
> 
> 
> Am 11/29/17 um 3:43 AM schrieb wangsan:
>> Hi Xincan,
>> 
>> Thanks for your reply. 
>> 
>> The system default timezone is just as what I expected 
>> (sun.util.calendar.ZoneInfo[id="Asia/Shanghai",offset=2880,dstSavings=0,useDaylight=false,transitions=19,lastRule=null]).
>>  
>> I looked into the generated code, and I found the following code snippet:
>> 
>> ```
>> result$20 = 
>> org.apache.calcite.runtime.SqlFunctions.internalToTimestamp(result$19);
>> ```
>> 
>> And what `internalToTimestamp` function did is:
>> 
>> ```
>> public static Timestamp internalToTimestamp(long v) {
>> return new Timestamp(v - (long)LOCAL_TZ.getOffset(v));
>> }
>> ```
>> 
>> So, if I give it an event time with unix timestamp 0, then I got the 
>> Timestamp(-2880). I am confused why `internalToTimestamp` need to 
>> subtract the offset?
>> 
>> Best,
>> wangsan
>> 
>> 
>>> On 28 Nov 2017, at 11:32 PM, Xingcan Cui <xingc...@gmail.com 
>>> <mailto:xingc...@gmail.com>> wrote:
>>> 
>>> Hi wangsan,
>>> 
>>> in Flink, the ProcessingTime is just implemented by invoking 
>>> System.currentTimeMillis() and the long value will be automatically wrapped 
>>> to a Timestamp with the following statement:
>>> 
>>> `new java.sql.Timestamp(time - TimeZone.getDefault().getOffset(time));`
>>> 
>>> You can check your TimeZone.getDefault() to see if it returns the right 
>>> TimeZone. Generally, the returned value should rely on the default TimeZone 
>>> of your operating system.
>>> 
>>> Hope that helps.
>>> 
>>> Best,
>>> Xingcan
>>> 
>>> On Tue, Nov 28, 2017 at 9:31 PM, wangsan <wamg...@163.com 
>>> <mailto:wamg...@163.com>> wrote:
>>> Hi all,
>>> 
>>> While using Timestamp in Flint SQL, how can I set timezone info? Since my 
>>> current timezone is GMT+8, and I found the selected processing time is 
>>> always 8 hours late than current time. So as extracted event time.
>>> 
>>> Here’s my simplified code:
>>> val senv = StreamExecutionEnvironment.getExecutionEnvironment
>>> senv.setParallelism(1)
>>> senv.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
>>> 
>>> val sTableEnv = TableEnvironment.getTableEnvironment(senv)
>>> println(s"current time: ${new SimpleDateFormat(".MM.dd HH:mm:ss.SSS", 
>>> Locale.CHINA).format(new Date())}")
>>> 
>>> val stream: DataStream[(String, String, String)] = 
>>> senv.socketTextStream("localhost", ).map(line => (line, line, line))
>>> val table = sTableEnv.fromDataStream(stream, 'col1, 'col2, 'col3, 
>>> 't.proctime)
>>> sTableEnv.registerTable("foo", table)
>>> val result = sTableEnv.sql("select * from foo")
>>> result.printSchema()
>>> result.toAppendStream[Row].print()
>>> 
>>> senv.execute("foo")
>>> And here’s the result:
>>> 
>>> 
>>> 
>>> Best,
>>> wangsan
>>> 
>> 
> 



Re: Question about Timestamp in Flink SQL

2017-11-28 Thread wangsan
Hi Xincan,

Thanks for your reply. 

The system default timezone is just as what I expected 
(sun.util.calendar.ZoneInfo[id="Asia/Shanghai",offset=2880,dstSavings=0,useDaylight=false,transitions=19,lastRule=null]).
 
I looked into the generated code, and I found the following code snippet:

```
result$20 = 
org.apache.calcite.runtime.SqlFunctions.internalToTimestamp(result$19);
```

And what `internalToTimestamp` function did is:

```
public static Timestamp internalToTimestamp(long v) {
return new Timestamp(v - (long)LOCAL_TZ.getOffset(v));
}
```

So, if I give it an event time with unix timestamp 0, then I got the 
Timestamp(-2880). I am confused why `internalToTimestamp` need to subtract 
the offset?

Best,
wangsan


> On 28 Nov 2017, at 11:32 PM, Xingcan Cui <xingc...@gmail.com> wrote:
> 
> Hi wangsan,
> 
> in Flink, the ProcessingTime is just implemented by invoking 
> System.currentTimeMillis() and the long value will be automatically wrapped 
> to a Timestamp with the following statement:
> 
> `new java.sql.Timestamp(time - TimeZone.getDefault().getOffset(time));`
> 
> You can check your TimeZone.getDefault() to see if it returns the right 
> TimeZone. Generally, the returned value should rely on the default TimeZone 
> of your operating system.
> 
> Hope that helps.
> 
> Best,
> Xingcan
> 
> On Tue, Nov 28, 2017 at 9:31 PM, wangsan <wamg...@163.com 
> <mailto:wamg...@163.com>> wrote:
> Hi all,
> 
> While using Timestamp in Flint SQL, how can I set timezone info? Since my 
> current timezone is GMT+8, and I found the selected processing time is always 
> 8 hours late than current time. So as extracted event time.
> 
> Here’s my simplified code:
> val senv = StreamExecutionEnvironment.getExecutionEnvironment
> senv.setParallelism(1)
> senv.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
> 
> val sTableEnv = TableEnvironment.getTableEnvironment(senv)
> println(s"current time: ${new SimpleDateFormat(".MM.dd HH:mm:ss.SSS", 
> Locale.CHINA).format(new Date())}")
> 
> val stream: DataStream[(String, String, String)] = 
> senv.socketTextStream("localhost", ).map(line => (line, line, line))
> val table = sTableEnv.fromDataStream(stream, 'col1, 'col2, 'col3, 't.proctime)
> sTableEnv.registerTable("foo", table)
> val result = sTableEnv.sql("select * from foo")
> result.printSchema()
> result.toAppendStream[Row].print()
> 
> senv.execute("foo")
> And here’s the result:
> 
> 
> 
> Best,
> wangsan
> 



Re: Hive integration in table API and SQL

2017-11-20 Thread wangsan
Hi Timo,

Thanks for your reply. I do notice that the document says "A Table is always 
bound to a specific TableEnvironment. It is not possible to combine tables of 
different TableEnvironments in the same query, e.g., to join or union them.” 
Does that mean there is no way I can make operations, like join, on a streaming 
table and a batch table ?

Best,
wangsan

> On 20 Nov 2017, at 9:15 PM, Timo Walther <twal...@apache.org> wrote:
> 
> Timo



Hive integration in table API and SQL

2017-11-20 Thread wangsan
Hi all,

I am currently learning table API and SQL in Flink. I noticed that Flink does 
not support Hive tables as table source, and even JDBC table source are not 
provided. There are cases we do need to join a stream table with static Hive or 
other database tables to get more specific attributes, so how can I implements 
this functionality. Do I need to implement my own dataset connectors to load 
data from external tables using JDBC and register the dataset as table, or 
should I provide an external catalog?

Thanks,
wangsan


Re:Re: Exception in BucketingSink when cancelling Flink job

2017-09-28 Thread wangsan
Hi,


'Join' method can be call with a timeout (as is called in TaskCanceler), so it 
won't be block forever if  the respective thread is in deadlock state. Maybe 
calling 'interrupt()'  after 'join(timeout)' is more reasonable, altought it 
still can not make sure operations inside 'close()' method is finished.


Best,
wangsan


在2017年09月29 01时52分, "Stephan Ewen"<step...@data-artisans.com>写道:


Hi!


Calling 'interrupt()' makes only sense before 'join()', because 'join()' blocks 
until the respective thread is finished.
The 'interrupt()' call happens to cancel the task out of potentially blocking 
I/O or sleep/wait operations.


The problem is that HDFS does not handle interrupts correctly, it sometimes 
deadlocks in the case of interrupts on unclosed streams :-(


I think it would be important to make sure (in the Bucketing Sink) that the DFS 
streams are closed upon task cancellation.
@aljoscha - adding you to this thread, as you know most about the bucketing 
sink.


Best,
Stephan




On Wed, Sep 27, 2017 at 10:18 AM, Stefan Richter <s.rich...@data-artisans.com> 
wrote:

Hi,


I would speculate that the reason for this order is that we want to shutdown 
the tasks quickly by interrupting blocking calls in the event of failure, so 
that recover can begin as fast as possible. I am looping in Stephan who might 
give more details about this code.


Best,
Stefan 


Am 27.09.2017 um 07:33 schrieb wangsan <wamg...@163.com>:



After digging into the source code, we found that when Flink job is canceled, a 
TaskCanceler thread is created.

The TaskCanceler thread calls cancel() on the invokable and periodically 
interrupts the
task thread until it has terminated.

try {
  invokable.cancel();
} catch (Throwable t) {
  logger.error("Error while canceling the task {}.", taskName, t);
}//..executer.interrupt();try {
  executer.join(interruptInterval);
}catch (InterruptedException e) {  // we can ignore this}//..

Notice that TaskCanceler first send interrupt signal to task thread, and 
following with join method. And since the task thread is now try to close 
DFSOutputStream, which is waiting for ack, thus InterruptedException is throwed 
out in task thread.

synchronized (dataQueue) {while (!streamerClosed) {
  checkClosed();  if (lastAckedSeqno >= seqno) {break;
  }  try {
dataQueue.wait(1000); // when we receive an ack, we notify on
// dataQueue
  } catch (InterruptedException ie) {thrownewInterruptedIOException(
"Interrupted while waiting for data to be acknowledged by pipeline");
  }
}

I was confused why TaskCanceler call executer.interrupt() before 
executer.join(interruptInterval). Can anyone help?










Hi,


We are currently using BucketingSink to save data into HDFS in parquet format. 
But when the flink job was cancelled, we always got Exception in 
BucketingSink's  close method. The datailed exception info is as below:
[ERROR] [2017-09-26 20:51:58,893] 
[org.apache.flink.streaming.runtime.tasks.StreamTask] - Error during disposal 
of stream operator.
java.io.InterruptedIOException: Interrupted while waiting for data to be 
acknowledged by pipeline
at 
org.apache.hadoop.hdfs.DFSOutputStream.waitForAckedSeqno(DFSOutputStream.java:2151)
at 
org.apache.hadoop.hdfs.DFSOutputStream.flushInternal(DFSOutputStream.java:2130)
at org.apache.hadoop.hdfs.DFSOutputStream.closeImpl(DFSOutputStream.java:2266)
at org.apache.hadoop.hdfs.DFSOutputStream.close(DFSOutputStream.java:2236)
at 
org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:72)
at org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:106)
at org.apache.parquet.hadoop.ParquetFileWriter.end(ParquetFileWriter.java:643)
at 
org.apache.parquet.hadoop.InternalParquetRecordWriter.close(InternalParquetRecordWriter.java:117)
at org.apache.parquet.hadoop.ParquetWriter.close(ParquetWriter.java:301)
...

at 
org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:43)
at 
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:126)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:429)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:334)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
at java.lang.Thread.run(Thread.java:745)


It seems that DFSOutputStream haven't been closed before task thread is force 
terminated. We found a similar problem in 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Changing-timeout-for-cancel-command-td12601.html
 , but setting "akka.ask.timeout" to a larger value does not work for us. So 
how can we make sure the stream is safely closed when cacelling a job?


Best,
wangsan















Exception in BucketingSink when cancelling Flink job

2017-09-26 Thread wangsan
Hi,


We are currently using BucketingSink to save data into HDFS in parquet format. 
But when the flink job was cancelled, we always got Exception in 
BucketingSink's  close method. The datailed exception info is as below:
[ERROR] [2017-09-26 20:51:58,893] 
[org.apache.flink.streaming.runtime.tasks.StreamTask] - Error during disposal 
of stream operator.
java.io.InterruptedIOException: Interrupted while waiting for data to be 
acknowledged by pipeline
at 
org.apache.hadoop.hdfs.DFSOutputStream.waitForAckedSeqno(DFSOutputStream.java:2151)
at 
org.apache.hadoop.hdfs.DFSOutputStream.flushInternal(DFSOutputStream.java:2130)
at org.apache.hadoop.hdfs.DFSOutputStream.closeImpl(DFSOutputStream.java:2266)
at org.apache.hadoop.hdfs.DFSOutputStream.close(DFSOutputStream.java:2236)
at 
org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:72)
at org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:106)
at org.apache.parquet.hadoop.ParquetFileWriter.end(ParquetFileWriter.java:643)
at 
org.apache.parquet.hadoop.InternalParquetRecordWriter.close(InternalParquetRecordWriter.java:117)
at org.apache.parquet.hadoop.ParquetWriter.close(ParquetWriter.java:301)
...

at 
org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:43)
at 
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:126)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:429)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:334)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
at java.lang.Thread.run(Thread.java:745)


It seems that DFSOutputStream haven't been closed before task thread is force 
terminated. We found a similar problem in 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Changing-timeout-for-cancel-command-td12601.html
 , but setting "akka.ask.timeout" to a larger value does not work for us. So 
how can we make sure the stream is safely closed when cacelling a job?


Best,
wangsan