Is that possible to specify join algorithm hint in Flink SQL

2019-04-15 Thread yinhua.dai
Hi team,

I know we can specify the join algorithm hint with dataset API
https://ci.apache.org/projects/flink/flink-docs-release-1.0/apis/batch/dataset_transformations.html#join-algorithm-hints

But wondering if this is possible to support with the SQL API?
We have market data with a currency id(a big data set), and we tried to join
a very small data set(currency id to currency name conversion) to get the
name, but it always very slow because of data skew(most of currency is USD).

Seems flink didn't choose the broadcast forward ship strategy, so I'm
wondering whether we can provide a hint for it, or how can I solve the
problem?

Thanks.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: RemoteTransportException: Connection unexpectedly closed by remote task manager

2019-03-28 Thread yinhua.dai
I have put the task manager of the data sink log to
https://gist.github.com/yinhua2018/7de42ff9c1738d5fdf9d99030db903e2



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: RemoteTransportException: Connection unexpectedly closed by remote task manager

2019-03-28 Thread yinhua.dai
Hi Qi,

I checked the JVM heap of the sink TM is low.

I tried to read flink source code to identify where is exact the error
happen.
I think the exception happened inside DataSinkTask.invoke()

// work!
while (!this.taskCanceled && ((record = 
input.next()) != null)) {
numRecordsIn.inc();
format.writeRecord(record);
}

RemoteTransportException should be thrown from "input.next()" when InputGate
tried to read data from the upstream.
Is this really a problem for this sink TM?
I'm a little bit confused.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


RemoteTransportException: Connection unexpectedly closed by remote task manager

2019-03-28 Thread yinhua.dai
Hi,

I write a single flink job with flink SQL with version 1.6.1
I have one table source which read data from a database, and one table sink
to output as avro format file.
The table source has parallelism of 19, and table sink only has parallelism
of 1.

But there is always RemoteTransportException when the job is nearly done(All
data source has been finished, and the data sink has been running for a
while).
The detail error as below:

2019-03-28 07:53:49,086 ERROR
org.apache.flink.runtime.operators.DataSinkTask   - Error in
user code: Connection unexpectedly closed by remote task manager
'ip-10-97-34-40.tr-fr-nonprod.aws-int.thomsonreuters.com/10.97.34.40:46625'.
This might indicate that the remote task manager was lost.:  DataSink
(com.tr.apt.sqlengine.tables.s3.AvroFileTableSink$AvroOutputFormat@42d174ad)
(1/1)
org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException:
Connection unexpectedly closed by remote task manager
'ip-10-97-34-40.tr-fr-nonprod.aws-int.thomsonreuters.com/10.97.34.40:46625'.
This might indicate that the remote task manager was lost.
at
org.apache.flink.runtime.io.network.netty.CreditBasedPartitionRequestClientHandler.channelInactive(CreditBasedPartitionRequestClientHandler.java:143)
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:245)
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:231)
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:224)
at
org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.channelInputClosed(ByteToMessageDecoder.java:377)
at
org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.channelInactive(ByteToMessageDecoder.java:342)
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:245)
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:231)
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:224)
at
org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline$HeadContext.channelInactive(DefaultChannelPipeline.java:1429)
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:245)
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:231)
at
org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline.fireChannelInactive(DefaultChannelPipeline.java:947)
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AbstractUnsafe$8.run(AbstractChannel.java:822)
at
org.apache.flink.shaded.netty4.io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163)
at
org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:404)
at
org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:463)
at
org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:884)
at java.lang.Thread.run(Thread.java:748)
2019-03-28 07:53:49,440 INFO 
com.tr.apt.sqlengine.tables.s3.AbstractFileOutputFormat   -
FileTableSink sinked all data to : file:///tmp/shareamount.avro
2019-03-28 07:53:49,441 INFO  org.apache.flink.runtime.taskmanager.Task 
   
- DataSink
(com.tr.apt.sqlengine.tables.s3.AvroFileTableSink$AvroOutputFormat@42d174ad)
(1/1) (31fd3e6fdbb1576e7288e202fff69b07) switched from RUNNING to FAILED.


Is the error means that the data sink failed to read all of data from some
data source instance before the source end itself?

When I check the log of task manager (10.97.34.40:46625), it's all ok, it
shows it successfully finished its job and receive SIGNAL 15 and then
terminate itself.
So how should I find out the root cause of the error?




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Execution sequence for slot sharing

2019-03-26 Thread yinhua.dai
Hi Community,

Can anyone help me understand the execution sequence in batch mode?

1. Can I set slot isolation in batch mode? I can only find the
slotSharingGroup API in streaming mode.

2. When multiple data source parallel instances are allocated to the same
slot, how does flink run those data sources? Does they run one by one? Or
each of the data source run in parallel in the same slot?


Thank you.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Job crashed very soon

2019-03-21 Thread yinhua.dai
Hi Community,

I was trying to run a big batch job which use JDBCInputFormat to retrieve a
large amount data from a mysql database and do some joins in flink, the
environment is AWS EMR. But it always failed very fast.

I'm using flink on yarn, flink 1.6.1
my cluster has 1000GB memory, my job parameter is:
-yD akka.ask.timeout=60s -yD akka.framesize=300m -yn 50 -ys 2 -yjm 8192 -ytm
8192 -p 40

I have 6 data sources with different tables and most of them are set with
100 parallelism.
I can only see below WARN logs from the yarn aggregated yarn logs, the whole
log is too big.

2019-03-21 09:10:16,430 WARN  org.apache.flink.runtime.taskmanager.Task 
   
- Task 'CHAIN DataSource (at createInput(ExecutionEnvironment.java:548)
(org.apache.flink.api.java.io.jdbc.JDBCInputFormat)) -> FlatMap (where:
(AND(=(VALUE, _UTF-16LE'true'), =(ATTRIBUTENAME,
_UTF-16LE'QuoteCurrencyId'))), select: (QUOTEID)) (10/50)' did not react to
cancelling signal for 30 seconds, but is stuck in method:
 java.net.SocketInputStream.socketRead0(Native Method)
java.net.SocketInputStream.socketRead(SocketInputStream.java:116)
java.net.SocketInputStream.read(SocketInputStream.java:171)
java.net.SocketInputStream.read(SocketInputStream.java:141)
sun.security.ssl.InputRecord.readFully(InputRecord.java:465)
sun.security.ssl.InputRecord.read(InputRecord.java:503)
sun.security.ssl.SSLSocketImpl.readRecord(SSLSocketImpl.java:975)
sun.security.ssl.SSLSocketImpl.readDataRecord(SSLSocketImpl.java:933)
sun.security.ssl.AppInputStream.read(AppInputStream.java:105)
java.io.FilterInputStream.read(FilterInputStream.java:133)
com.mysql.cj.protocol.FullReadInputStream.readFully(FullReadInputStream.java:64)
com.mysql.cj.protocol.a.SimplePacketReader.readMessage(SimplePacketReader.java:108)
com.mysql.cj.protocol.a.SimplePacketReader.readMessage(SimplePacketReader.java:45)
com.mysql.cj.protocol.a.TimeTrackingPacketReader.readMessage(TimeTrackingPacketReader.java:57)
com.mysql.cj.protocol.a.TimeTrackingPacketReader.readMessage(TimeTrackingPacketReader.java:41)
com.mysql.cj.protocol.a.MultiPacketReader.readMessage(MultiPacketReader.java:61)
com.mysql.cj.protocol.a.MultiPacketReader.readMessage(MultiPacketReader.java:44)
com.mysql.cj.protocol.a.ResultsetRowReader.read(ResultsetRowReader.java:75)
com.mysql.cj.protocol.a.ResultsetRowReader.read(ResultsetRowReader.java:42)
com.mysql.cj.protocol.a.NativeProtocol.read(NativeProtocol.java:1685)
com.mysql.cj.protocol.a.TextResultsetReader.read(TextResultsetReader.java:87)
com.mysql.cj.protocol.a.TextResultsetReader.read(TextResultsetReader.java:48)
com.mysql.cj.protocol.a.NativeProtocol.read(NativeProtocol.java:1698)
com.mysql.cj.protocol.a.NativeProtocol.readAllResults(NativeProtocol.java:1752)
com.mysql.cj.protocol.a.NativeProtocol.sendQueryPacket(NativeProtocol.java:1041)
com.mysql.cj.NativeSession.execSQL(NativeSession.java:1157)
com.mysql.cj.jdbc.ClientPreparedStatement.executeInternal(ClientPreparedStatement.java:947)
com.mysql.cj.jdbc.ClientPreparedStatement.executeQuery(ClientPreparedStatement.java:1020)
org.apache.flink.api.java.io.jdbc.JDBCInputFormat.open(JDBCInputFormat.java:238)
org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:170)
org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
java.lang.Thread.run(Thread.java:748)

2019-03-21 09:10:16,883 WARN  akka.remote.ReliableDeliverySupervisor
   
- Association with remote system
[akka.tcp://fl...@ip-10-97-33-195.tr-fr-nonprod.aws-int.thomsonreuters.com:41133]
has failed, address is now gated for [50] ms. Reason: [Disassociated] 
2019-03-21 09:10:18,447 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner  
   
- RECEIVED SIGNAL 15: SIGTERM. Shutting down as requested.
2019-03-21 09:10:18,448 INFO 
org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager  -
Shutting down TaskExecutorLocalStateStoresManager.
2019-03-21 09:10:18,448 INFO 
org.apache.flink.runtime.blob.PermanentBlobCache  - Shutting
down BLOB cache
2019-03-21 09:10:18,448 INFO 
org.apache.flink.runtime.blob.TransientBlobCache  - Shutting
down BLOB cache


2019-03-21 09:08:36,913 WARN  akka.remote.transport.netty.NettyTransport
   
- Remote connection to [/10.97.33.195:39282] failed with
java.io.IOException: Connection reset by peer
2019-03-21 09:08:36,913 WARN  akka.remote.ReliableDeliverySupervisor
   
- Association with remote system
[akka.tcp://fl...@ip-10-97-33-195.tr-fr-nonprod.aws-int.thomsonreuters.com:45423]
has failed, address is now gated for [50] ms. Reason: [Disassociated] 
2019-03-21 09:08:37,020 WARN  org.apache.flink.yarn.YarnResourceManager 
   
- Discard registration from TaskExecutor
container_1553143971811_0015_01_59 at
(akka.tcp://fl...@ip-10-97-36-43.tr-fr-nonprod.aws-int.thomsonreuters.com:44685/user/taskmanager_0)
because the framework did not recognize it
2019-03-21 09:08:37,020 WARN  org.apache.flink.yarn.

Re: What should I take care if I enable object reuse

2019-03-17 Thread yinhua.dai
Get it, thanks guys



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: What should I take care if I enable object reuse

2019-03-14 Thread yinhua.dai
Hi Elias,

Thanks.
Would it be good enough as long as we use always use different object when
call the Collector.collect() method in the operator?



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


What should I take care if I enable object reuse

2019-03-13 Thread yinhua.dai
Hi Community,

I saw from the document that we need to be careful about enable the object
reuse feature.
So which part should I check to avoid any issues? Can any one help to
summarize?
Thank you.

//
*enableObjectReuse() / disableObjectReuse()* By default, objects are not
reused in Flink. Enabling the object reuse mode will instruct the runtime to
reuse user objects for better performance. Keep in mind that this can lead
to bugs when the user-code function of an operation is not aware of this
behavior.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: flink submit job rest api classpath jar

2019-02-17 Thread yinhua.dai
Maybe you could consider to put your udf jar to flink/lib before job
submission.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Limit in batch flink sql job

2019-02-12 Thread yinhua.dai
OK, thanks.
It might be better to update the document which has the following example
that confused me.

SELECT *
FROM Orders
LIMIT 3



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Limit in batch flink sql job

2019-02-11 Thread yinhua.dai
Why flink said "Limiting the result without sorting is not allowed as it
could lead to arbitrary results" when I use limit in batch mode?

SELECT * FROM csvSource limit 10;



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Get UnknownTaskExecutorException when I add a new configuration in flink-conf.yaml

2019-01-31 Thread yinhua.dai
Hi Community,I added below item in flink-conf.yaml, and I saw
UnknownTaskExecutorException each time when I start flink in Windows via
start-cluster.bat.*fs.s3a.aws.credentials.provider:
com.tr.apt.sqlengine.tables.aws.HadoopAWSCredentialsProviderChain*I'm sure
that this new configuration is successfully read by flink and the class in
already put in the class path, and there is no error if I remove this
line.I'm using flink 1.6.02019-02-01 15:40:57,148 ERROR
org.apache.flink.runtime.rest.handler.taskmanager.TaskManagerDetailsHandler 
- Implementation error: Unhandled
exception.org.apache.flink.runtime.resourcemanager.exceptions.UnknownTaskExecutorException:
No TaskExecutor registered under d6f5b0f7dd86432ffb515edea31d4f01.  at
org.apache.flink.runtime.resourcemanager.ResourceManager.requestTaskManagerInfo(ResourceManager.java:558)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)  at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498) at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:247)
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:162)
at
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:70)
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142)
at
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40)
at
akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)
at akka.actor.Actor$class.aroundReceive(Actor.scala:502)at
akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)at
akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)at
akka.actor.ActorCell.invoke(ActorCell.scala:495)at
akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257) at
akka.dispatch.Mailbox.run(Mailbox.scala:224)at
akka.dispatch.Mailbox.exec(Mailbox.scala:234)   at
scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: Is there a way to get all flink build-in SQL functions

2019-01-25 Thread yinhua.dai
Thanks Guys.
I just wondering if there is another way except hard code the list:)
Thanks anyway.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Is there a way to get all flink build-in SQL functions

2019-01-22 Thread yinhua.dai
I would like to put this list to the our self service flink SQL web UI.
Thanks.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: The way to write a UDF with generic type

2019-01-08 Thread yinhua.dai
Get it, thanks.




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: How to get the temp result of each dynamic table when executing Flink-SQL?

2019-01-07 Thread yinhua.dai
In our case, we wrote a console table sink which print everything on the
console, and use "insert into" to write the interim result to console.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: The way to write a UDF with generic type

2019-01-07 Thread yinhua.dai
Hi Timo,

Can you let me know how the build-in "MAX" function able to support
different types?
Thanks.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: The way to write a UDF with generic type

2019-01-06 Thread yinhua.dai
Hi Timo,

But getResultType should only return a concrete type information, right?
How could I implement with a generic type?

I'd like to clarify my questions again.
Say I want to implement my own "MAX" function, but I want to apply it to
different types, e.g. integer, long, double etc, so I tried to write a class
which extends AggregateFunction *with generic type* to implement the max
function.

Then I want to register only one function name for all types.
E.g. 
tableEnv.registerFunction("MYMAX", new MyMax());
instead of
tableEnv.registerFunction("MYINTEGERMAX", new MyIntegerMax());
tableEnv.registerFunction("MYLONGMAX", new MyLongMax());
tableEnv.registerFunction("MYDOULBEMAX", new MyDoubleMax());

Is there a way to implement that?
I know the build in function "MAX" can apply to all types, so I wonder if I
can also implement that.
Thanks.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: The way to write a UDF with generic type

2019-01-04 Thread yinhua.dai
Hi Chesnay,

Maybe you misunderstand my question.
I have below code:
public class MyMaxAggregation extends AggregateFunction {
  @Override
  public MyAccumulator createAccumulator() {
return new MyAccumulator();
  }

  @Override
  public T getValue(MyAccumulator accumulator) {
return null;
  }

  static class MyAccumulator {
double maxValue;
  }

}

But tableEnv.registerFunction("MYMAX", new MyMaxAggregation());
will throw exception as below:
Exception in thread "main"
org.apache.flink.api.common.functions.InvalidTypesException: Type of
TypeVariable 'T' in 'class com.tr.apt.test.MyMaxAggregation' could not be
determined. This is most likely a type erasure problem. The type extraction
currently supports types with generic variables only in cases where all
variables in the return type can be deduced from the input type(s).
Otherwise the type has to be specified explicitly using type information.
at
org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfoWithTypeHierarchy(TypeExtractor.java:882)
at
org.apache.flink.api.java.typeutils.TypeExtractor.privateCreateTypeInfo(TypeExtractor.java:803)
at
org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfo(TypeExtractor.java:769)
at
org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfo(TypeExtractor.java:762)
at
org.apache.flink.table.api.java.StreamTableEnvironment.registerFunction(StreamTableEnvironment.scala:482)
at com.tr.apt.test.StreamingJob.main(StreamingJob.java:52)




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


The way to write a UDF with generic type

2019-01-03 Thread yinhua.dai
Hi Community,

I tried to write a UDF with generic type, but seems that Flink will complain
not recognizing the type information when I use it in SQL.

I checked the implementation of native function "MAX" and realize that it's
not using the same API(AggregationFunction e.g.) as user defined function,
is that the reason why "MAX" doesn't have the generic type issue?

How can I implement my own "MAX" function which could support all types?
Thanks.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Flink SQL client always cost 2 minutes to submit job to a local cluster

2019-01-03 Thread yinhua.dai
Hi Fabian,

It's the submission of the jar file cost too long time.
And yes Hequn and your suggestion is working, but just curious why a 100M
jar files causes so long time to submit, is it related with some upload
parameter settings of the web layer?



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Flink SQL client always cost 2 minutes to submit job to a local cluster

2018-12-30 Thread yinhua.dai
I have to do that for now, however I have to find another way because the jar
some times get update and the flink cluster will be remotely in future.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Flink SQL client always cost 2 minutes to submit job to a local cluster

2018-12-27 Thread yinhua.dai
I am using Flink 1.6.1, I tried to use flink sql client with some own jars
with --jar and --library.
It can work to execute sql query, however it always cause around 2 minutes
to submit the job the local cluster, but when I copy my jar to flink lib,
and remove --jar and --library parameter, it can submit the job immediately.

I debugged and found the 2 minutes is cost by the RestClusterClient to send
the request with the jar as payload to the flink cluster.

I don't know why it uses 2 minutes to upload the package? Is there a way to
work around it?
Thanks.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: how to override s3 key config in flink job

2018-11-27 Thread yinhua.dai
It might be difficult as you the task manager and job manager are pre-started
in a session mode.

It seems that flink http server will always use the configuration that you
specified when you start your flink cluster, i.e. start-cluster.sh, I don't
find a way to override it.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: how to override s3 key config in flink job

2018-11-26 Thread yinhua.dai
Which flink version are you using.
I know how it works in yarn, but not very clear with standalone mode.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: where can I see logs from code

2018-11-26 Thread yinhua.dai
The code running in your main method will output to flink cli log, others
like map function etc will output to task manager log.

Are you saying that you only see flink code in
http://SERVERADD/#/taskmanager/TM_ID/log?
It might be useful if you elaborate your environment.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: your advice please regarding state

2018-11-26 Thread yinhua.dai
General approach#1 is ok, but you may have to use some hash based key
selector if you have a heavy data skew.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: how to override s3 key config in flink job

2018-11-26 Thread yinhua.dai
Did you try "-Dkey=value"?



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: TaskManager & task slots

2018-11-21 Thread yinhua.dai
OK, thanks.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: About the issue caused by flink's dependency jar package submission method

2018-11-20 Thread yinhua.dai
As far as I know, -yt works for both job manager and task manager, -C works
for flink cli.

Did you consider putting all your jars in /flink/lib?



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: TaskManager & task slots

2018-11-20 Thread yinhua.dai
Hi Fabian,

Is below description still remain the same in Flink 1.6?

Slots do not guard CPU time, IO, or JVM memory. At the moment they only
isolate managed memory which is only used for batch processing. For
streaming applications their only purpose is to limit the number of parallel
threads that can be executed by a TaskManager.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Always trigger calculation of a tumble window in Flink SQL

2018-11-08 Thread yinhua.dai
I am able to write a single operator as you suggested, thank you.

And then I saw ContinuousProcessingTimeTrigger from flink source code, it
looks like it's something that I am looking for, if there is a way that I
can customize the SQL "TUMBLE" window to use this trigger instead of
ProcessingTimeTrigger, then it should solve my problem.

Do you know if there is a way to use a customize trigger in the "TUMBLE"
window of SQL?



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Always trigger calculation of a tumble window in Flink SQL

2018-11-08 Thread yinhua.dai
Hi Piotr, 

Thank you for your explanation. 
I basically understand your meaning, as far as my understanding, we can
write a custom window assigner and custom trigger, and we can register the
timer when the window process elements. 

But How can we register a timer when no elements received during a time
window? 
My requirement is to always fire at end of the time window even no result
from the sql query.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Always trigger calculation of a tumble window in Flink SQL

2018-11-08 Thread yinhua.dai
Hi Piotr, 

Thank you for your explanation. 
I basically understand your meaning, as far as my understanding, we can
write a custom window assigner and custom trigger, and we can register the
timer when the window process elements. 

But How can we register a timer when no elements received during a time
window? 
My requirement is to always fire at end of the time window even no result
from the sql query.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Always trigger calculation of a tumble window in Flink SQL

2018-11-08 Thread yinhua.dai
Hi Piotr,

Thank you for your explanation.
I basically understand your meaning, as far as my understanding, we can
write a custom window assigner and custom trigger, and we can register the
timer when the window process elements.

But How can we register a timer when no elements received during a time
window?
My requirement is to always fire at end of the time window even no result
from the sql query.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Always trigger calculation of a tumble window in Flink SQL

2018-11-08 Thread yinhua.dai
Hi Piotr, Thank you for your explanation. I basically understand your
meaning, as far as my understanding, we can write a custom window assigner
and custom trigger, and we can register the timer when the window process
elements. But How can we register a timer when no elements received during a
time window? My requirement is to always fire at end of the time window even
no result from the sql query.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: Always trigger calculation of a tumble window in Flink SQL

2018-11-06 Thread yinhua.dai
Hi Piotr,

Can you elaborate more on the solution with the custom operator?
I don't think there will be any records from the SQL query if no input data
in coming in within the time window even if we convert the result to a
datastream.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Always trigger calculation of a tumble window in Flink SQL

2018-11-05 Thread yinhua.dai
We have a requirement that always want to trigger a calculation on a timer
basis e.g. every 1 minute.

*If there are records come in flink during the time window then calculate it
with the normal way, i.e. aggregate for each record and getResult() at end
of the time window.*

*If there are no records come in flink during the time window, then send the
last calculated result.*

I know that Flink will not trigger the calculation in the second case(when
no records come in the system during the time window), if there a solution
for me in Flink SQL?



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: How to configure TaskManager's JVM options through cmdline?

2018-10-23 Thread yinhua.dai
You can try with *-yD env.java.opts.taskmanager="-XX:+UseConcMarkSweepGC"*
if you are running fink on yarn.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Reverse the order of fields in Flink SQL

2018-10-23 Thread yinhua.dai
Hi Timo,

I write simple testing code for the issue, please checkout
https://gist.github.com/yinhua-dai/143304464270afd19b6a926531f9acb1

I write a custom table source which just use RowCsvInputformat to create the
dataset, and use the provided CsvTableSink, and can reproduce the issue.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Reverse the order of fields in Flink SQL

2018-10-23 Thread yinhua.dai
I think I have already done that in my custom sink.

 @Override
  public String[] getFieldNames() {
return this.fieldNames;
  }

  @Override
  public TypeInformation[] getFieldTypes() {

return this.fieldTypes;
  }

@Override
  public TableSink configure(String[] fieldNames, TypeInformation[]
fieldTypes) {
this.fieldNames = fieldNames;
this.fieldTypes = fieldTypes;
return this;
  }



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Reverse the order of fields in Flink SQL

2018-10-23 Thread yinhua.dai
I write a customized table source, and it emits some fields let's say f1, f2.

And then I just write to a sink with a reversed order of fields, as below:
*select f2, f1 from customTableSource*

And I found that it actually doesn't do the field reverse.


Then I tried with flink provided CsvTableSource and CsvTableSink, I found
that it has no problem reverse the order, and after some investigation I
found that it's related with two things:
1. *CsvTableSource* implemented *ProjectableTableSource*
2. *RowCsvInputFormat* supports *selectedFields*

Do I have to do the two things as well in my custom table source to get the
reverse work?



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


How to access Ftp server with passive mode

2018-10-18 Thread yinhua.dai
I am using flink to download and process a big file from a remote ftp server
in AWS EMR.
As flink supports ftp protocol with hadoop ftp file system, so I use the
CSVInputFormat with a ftp address(ftp://user:pass@server/path/file).

It works correct in my local machine, but when I run the job in EMR it
failed to establish the connection between the EMR node and ftp server.
After some investigation I found that we have to use ftp passive mode to
access the ftp server in AWS, but by checking the source code of
FtpFileSystem, it looks like it always use active mode, I can't find a way
to inject and modify the behavior to use passive mode, although FtpClient
has method to do that.

Do you have any suggestions, thank you.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: User jar is present in the flink job manager's class path

2018-10-11 Thread yinhua.dai
Hi Gary,

Yes you are right, we are using the attach mode.
I will try to put my jar to flink/lib to get around with the issue.
Thanks.

I will open a jira for the discrepancy for flink 1.3 and 1.5, thanks a lot.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: User jar is present in the flink job manager's class path

2018-10-11 Thread yinhua.dai
Meanwhile, I can see below code in flink 1.5

public static final ConfigOption CLASSPATH_INCLUDE_USER_JAR =
key("yarn.per-job-cluster.include-user-jar")
.defaultValue("ORDER")
.withDescription("Defines whether user-jars are 
included in the system
class path for per-job-clusters as" +
" well as their positioning in the path. They 
can be positioned at the
beginning (\"FIRST\"), at the" +
" end (\"LAST\"), or be positioned based on 
their name (\"ORDER\").
Setting this parameter to" +
" \"DISABLED\" causes the jar to be included in 
the user class path
instead.");

Does this mean the user jar should always be included in class path?



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: User jar is present in the flink job manager's class path

2018-10-11 Thread yinhua.dai
Hi Timo,

I didn't tried to configure the classloader order, according to the
document, it should only be needed for yarn-session mode, right?

I can see the ship files(-yt /path/dir/) is present in job manager's class
path, so maybe I should put my uber jar in the -yt path so that it will be
shipped and add to class path in flink 1.5?



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


User jar is present in the flink job manager's class path

2018-10-11 Thread yinhua.dai
We have some customized log4j layout implementation so we need flink job
manager/task manager be able to load the logger implementation which is
packaged in the uber jar.

However, we noticed that in flink 1.3, the user jar is put at the beginning
of job manager, when we do the same again in flink 1.5, the user jar is not
there any more.
Is this expected?

I saw this is the document:
*When submitting a Flink job/application directly to YARN (via bin/flink run
-m yarn-cluster ...), dedicated TaskManagers and JobManagers are started for
that job. Those JVMs have both Flink framework classes and user code classes
in the Java classpath. That means that there is no dynamic classloading
involved in that case.*

And we are using flink on yarn with per-job mode.
So confused by what we experiencing for now.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


UnsatisfiedLinkError when using flink-s3-fs-hadoop

2018-09-10 Thread yinhua.dai
Hi,

I have experience UnsatisfiedLinkError when I tried to use
flink-s3-fs-hadoop to sink to s3 in my local Windows machine.

I googled and tried several solutions like download hadoop.dll and
winutils.exe, set up HADOOP_HOME and PATH environment variables, copy
hadoop.dll to C:\Windows\System32, none of them get worked.

I also tried to load the hadoop library myself in the code by using
"System.loadlibrary("hadoop")", it succeed, but the error still happen,
anything additional step I am missing?

Attach my code(*the 2nd line is just trying to debug and it will throw
exception*):

public static void main(String[] args) throws Exception {
System.loadLibrary("hadoop");
NativeIO.Windows.access(null, NativeIO.Windows.AccessRight.ACCESS_READ);
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStream ds =
env.readTextFile("s3://fts-test/test/input.csv");
ds.print();
ds.writeAsText("s3://fts-test/test/output.csv");
   
FileSystem.initialize(GlobalConfiguration.loadConfiguration("D:\\Technology\\flink\\flink-1.5.3\\conf"));

env.execute();
  }



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Facing Issues while migrating to S3A

2018-09-09 Thread yinhua.dai
Hi,

I am still have the same problem, googled many ways but still failed.
I have downloaded and added hadoop.dll and winutils.exe to class path.

To verify that is working, I called "System.loadLibrary("haddop")" at the
beginning of my java program and it succeed.

BTW: I run my program in windows 7 64bit.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/