Is that possible to specify join algorithm hint in Flink SQL
Hi team, I know we can specify the join algorithm hint with dataset API https://ci.apache.org/projects/flink/flink-docs-release-1.0/apis/batch/dataset_transformations.html#join-algorithm-hints But wondering if this is possible to support with the SQL API? We have market data with a currency id(a big data set), and we tried to join a very small data set(currency id to currency name conversion) to get the name, but it always very slow because of data skew(most of currency is USD). Seems flink didn't choose the broadcast forward ship strategy, so I'm wondering whether we can provide a hint for it, or how can I solve the problem? Thanks. -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Re: RemoteTransportException: Connection unexpectedly closed by remote task manager
I have put the task manager of the data sink log to https://gist.github.com/yinhua2018/7de42ff9c1738d5fdf9d99030db903e2 -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Re: RemoteTransportException: Connection unexpectedly closed by remote task manager
Hi Qi, I checked the JVM heap of the sink TM is low. I tried to read flink source code to identify where is exact the error happen. I think the exception happened inside DataSinkTask.invoke() // work! while (!this.taskCanceled && ((record = input.next()) != null)) { numRecordsIn.inc(); format.writeRecord(record); } RemoteTransportException should be thrown from "input.next()" when InputGate tried to read data from the upstream. Is this really a problem for this sink TM? I'm a little bit confused. -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
RemoteTransportException: Connection unexpectedly closed by remote task manager
Hi, I write a single flink job with flink SQL with version 1.6.1 I have one table source which read data from a database, and one table sink to output as avro format file. The table source has parallelism of 19, and table sink only has parallelism of 1. But there is always RemoteTransportException when the job is nearly done(All data source has been finished, and the data sink has been running for a while). The detail error as below: 2019-03-28 07:53:49,086 ERROR org.apache.flink.runtime.operators.DataSinkTask - Error in user code: Connection unexpectedly closed by remote task manager 'ip-10-97-34-40.tr-fr-nonprod.aws-int.thomsonreuters.com/10.97.34.40:46625'. This might indicate that the remote task manager was lost.: DataSink (com.tr.apt.sqlengine.tables.s3.AvroFileTableSink$AvroOutputFormat@42d174ad) (1/1) org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException: Connection unexpectedly closed by remote task manager 'ip-10-97-34-40.tr-fr-nonprod.aws-int.thomsonreuters.com/10.97.34.40:46625'. This might indicate that the remote task manager was lost. at org.apache.flink.runtime.io.network.netty.CreditBasedPartitionRequestClientHandler.channelInactive(CreditBasedPartitionRequestClientHandler.java:143) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:245) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:231) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:224) at org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.channelInputClosed(ByteToMessageDecoder.java:377) at org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.channelInactive(ByteToMessageDecoder.java:342) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:245) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:231) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:224) at org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline$HeadContext.channelInactive(DefaultChannelPipeline.java:1429) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:245) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:231) at org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline.fireChannelInactive(DefaultChannelPipeline.java:947) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AbstractUnsafe$8.run(AbstractChannel.java:822) at org.apache.flink.shaded.netty4.io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163) at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:404) at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:463) at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:884) at java.lang.Thread.run(Thread.java:748) 2019-03-28 07:53:49,440 INFO com.tr.apt.sqlengine.tables.s3.AbstractFileOutputFormat - FileTableSink sinked all data to : file:///tmp/shareamount.avro 2019-03-28 07:53:49,441 INFO org.apache.flink.runtime.taskmanager.Task - DataSink (com.tr.apt.sqlengine.tables.s3.AvroFileTableSink$AvroOutputFormat@42d174ad) (1/1) (31fd3e6fdbb1576e7288e202fff69b07) switched from RUNNING to FAILED. Is the error means that the data sink failed to read all of data from some data source instance before the source end itself? When I check the log of task manager (10.97.34.40:46625), it's all ok, it shows it successfully finished its job and receive SIGNAL 15 and then terminate itself. So how should I find out the root cause of the error? -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Execution sequence for slot sharing
Hi Community, Can anyone help me understand the execution sequence in batch mode? 1. Can I set slot isolation in batch mode? I can only find the slotSharingGroup API in streaming mode. 2. When multiple data source parallel instances are allocated to the same slot, how does flink run those data sources? Does they run one by one? Or each of the data source run in parallel in the same slot? Thank you. -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Job crashed very soon
Hi Community, I was trying to run a big batch job which use JDBCInputFormat to retrieve a large amount data from a mysql database and do some joins in flink, the environment is AWS EMR. But it always failed very fast. I'm using flink on yarn, flink 1.6.1 my cluster has 1000GB memory, my job parameter is: -yD akka.ask.timeout=60s -yD akka.framesize=300m -yn 50 -ys 2 -yjm 8192 -ytm 8192 -p 40 I have 6 data sources with different tables and most of them are set with 100 parallelism. I can only see below WARN logs from the yarn aggregated yarn logs, the whole log is too big. 2019-03-21 09:10:16,430 WARN org.apache.flink.runtime.taskmanager.Task - Task 'CHAIN DataSource (at createInput(ExecutionEnvironment.java:548) (org.apache.flink.api.java.io.jdbc.JDBCInputFormat)) -> FlatMap (where: (AND(=(VALUE, _UTF-16LE'true'), =(ATTRIBUTENAME, _UTF-16LE'QuoteCurrencyId'))), select: (QUOTEID)) (10/50)' did not react to cancelling signal for 30 seconds, but is stuck in method: java.net.SocketInputStream.socketRead0(Native Method) java.net.SocketInputStream.socketRead(SocketInputStream.java:116) java.net.SocketInputStream.read(SocketInputStream.java:171) java.net.SocketInputStream.read(SocketInputStream.java:141) sun.security.ssl.InputRecord.readFully(InputRecord.java:465) sun.security.ssl.InputRecord.read(InputRecord.java:503) sun.security.ssl.SSLSocketImpl.readRecord(SSLSocketImpl.java:975) sun.security.ssl.SSLSocketImpl.readDataRecord(SSLSocketImpl.java:933) sun.security.ssl.AppInputStream.read(AppInputStream.java:105) java.io.FilterInputStream.read(FilterInputStream.java:133) com.mysql.cj.protocol.FullReadInputStream.readFully(FullReadInputStream.java:64) com.mysql.cj.protocol.a.SimplePacketReader.readMessage(SimplePacketReader.java:108) com.mysql.cj.protocol.a.SimplePacketReader.readMessage(SimplePacketReader.java:45) com.mysql.cj.protocol.a.TimeTrackingPacketReader.readMessage(TimeTrackingPacketReader.java:57) com.mysql.cj.protocol.a.TimeTrackingPacketReader.readMessage(TimeTrackingPacketReader.java:41) com.mysql.cj.protocol.a.MultiPacketReader.readMessage(MultiPacketReader.java:61) com.mysql.cj.protocol.a.MultiPacketReader.readMessage(MultiPacketReader.java:44) com.mysql.cj.protocol.a.ResultsetRowReader.read(ResultsetRowReader.java:75) com.mysql.cj.protocol.a.ResultsetRowReader.read(ResultsetRowReader.java:42) com.mysql.cj.protocol.a.NativeProtocol.read(NativeProtocol.java:1685) com.mysql.cj.protocol.a.TextResultsetReader.read(TextResultsetReader.java:87) com.mysql.cj.protocol.a.TextResultsetReader.read(TextResultsetReader.java:48) com.mysql.cj.protocol.a.NativeProtocol.read(NativeProtocol.java:1698) com.mysql.cj.protocol.a.NativeProtocol.readAllResults(NativeProtocol.java:1752) com.mysql.cj.protocol.a.NativeProtocol.sendQueryPacket(NativeProtocol.java:1041) com.mysql.cj.NativeSession.execSQL(NativeSession.java:1157) com.mysql.cj.jdbc.ClientPreparedStatement.executeInternal(ClientPreparedStatement.java:947) com.mysql.cj.jdbc.ClientPreparedStatement.executeQuery(ClientPreparedStatement.java:1020) org.apache.flink.api.java.io.jdbc.JDBCInputFormat.open(JDBCInputFormat.java:238) org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:170) org.apache.flink.runtime.taskmanager.Task.run(Task.java:711) java.lang.Thread.run(Thread.java:748) 2019-03-21 09:10:16,883 WARN akka.remote.ReliableDeliverySupervisor - Association with remote system [akka.tcp://fl...@ip-10-97-33-195.tr-fr-nonprod.aws-int.thomsonreuters.com:41133] has failed, address is now gated for [50] ms. Reason: [Disassociated] 2019-03-21 09:10:18,447 INFO org.apache.flink.yarn.YarnTaskExecutorRunner - RECEIVED SIGNAL 15: SIGTERM. Shutting down as requested. 2019-03-21 09:10:18,448 INFO org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager - Shutting down TaskExecutorLocalStateStoresManager. 2019-03-21 09:10:18,448 INFO org.apache.flink.runtime.blob.PermanentBlobCache - Shutting down BLOB cache 2019-03-21 09:10:18,448 INFO org.apache.flink.runtime.blob.TransientBlobCache - Shutting down BLOB cache 2019-03-21 09:08:36,913 WARN akka.remote.transport.netty.NettyTransport - Remote connection to [/10.97.33.195:39282] failed with java.io.IOException: Connection reset by peer 2019-03-21 09:08:36,913 WARN akka.remote.ReliableDeliverySupervisor - Association with remote system [akka.tcp://fl...@ip-10-97-33-195.tr-fr-nonprod.aws-int.thomsonreuters.com:45423] has failed, address is now gated for [50] ms. Reason: [Disassociated] 2019-03-21 09:08:37,020 WARN org.apache.flink.yarn.YarnResourceManager - Discard registration from TaskExecutor container_1553143971811_0015_01_59 at (akka.tcp://fl...@ip-10-97-36-43.tr-fr-nonprod.aws-int.thomsonreuters.com:44685/user/taskmanager_0) because the framework did not recognize it 2019-03-21 09:08:37,020 WARN org.apache.flink.yarn.
Re: What should I take care if I enable object reuse
Get it, thanks guys -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Re: What should I take care if I enable object reuse
Hi Elias, Thanks. Would it be good enough as long as we use always use different object when call the Collector.collect() method in the operator? -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
What should I take care if I enable object reuse
Hi Community, I saw from the document that we need to be careful about enable the object reuse feature. So which part should I check to avoid any issues? Can any one help to summarize? Thank you. // *enableObjectReuse() / disableObjectReuse()* By default, objects are not reused in Flink. Enabling the object reuse mode will instruct the runtime to reuse user objects for better performance. Keep in mind that this can lead to bugs when the user-code function of an operation is not aware of this behavior. -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Re: flink submit job rest api classpath jar
Maybe you could consider to put your udf jar to flink/lib before job submission. -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Re: Limit in batch flink sql job
OK, thanks. It might be better to update the document which has the following example that confused me. SELECT * FROM Orders LIMIT 3 -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Limit in batch flink sql job
Why flink said "Limiting the result without sorting is not allowed as it could lead to arbitrary results" when I use limit in batch mode? SELECT * FROM csvSource limit 10; -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Get UnknownTaskExecutorException when I add a new configuration in flink-conf.yaml
Hi Community,I added below item in flink-conf.yaml, and I saw UnknownTaskExecutorException each time when I start flink in Windows via start-cluster.bat.*fs.s3a.aws.credentials.provider: com.tr.apt.sqlengine.tables.aws.HadoopAWSCredentialsProviderChain*I'm sure that this new configuration is successfully read by flink and the class in already put in the class path, and there is no error if I remove this line.I'm using flink 1.6.02019-02-01 15:40:57,148 ERROR org.apache.flink.runtime.rest.handler.taskmanager.TaskManagerDetailsHandler - Implementation error: Unhandled exception.org.apache.flink.runtime.resourcemanager.exceptions.UnknownTaskExecutorException: No TaskExecutor registered under d6f5b0f7dd86432ffb515edea31d4f01. at org.apache.flink.runtime.resourcemanager.ResourceManager.requestTaskManagerInfo(ResourceManager.java:558) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:247) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:162) at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:70) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142) at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40) at akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165) at akka.actor.Actor$class.aroundReceive(Actor.scala:502)at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)at akka.actor.ActorCell.invoke(ActorCell.scala:495)at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257) at akka.dispatch.Mailbox.run(Mailbox.scala:224)at akka.dispatch.Mailbox.exec(Mailbox.scala:234) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Re: Is there a way to get all flink build-in SQL functions
Thanks Guys. I just wondering if there is another way except hard code the list:) Thanks anyway. -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Is there a way to get all flink build-in SQL functions
I would like to put this list to the our self service flink SQL web UI. Thanks. -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Re: The way to write a UDF with generic type
Get it, thanks. -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Re: How to get the temp result of each dynamic table when executing Flink-SQL?
In our case, we wrote a console table sink which print everything on the console, and use "insert into" to write the interim result to console. -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Re: The way to write a UDF with generic type
Hi Timo, Can you let me know how the build-in "MAX" function able to support different types? Thanks. -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Re: The way to write a UDF with generic type
Hi Timo, But getResultType should only return a concrete type information, right? How could I implement with a generic type? I'd like to clarify my questions again. Say I want to implement my own "MAX" function, but I want to apply it to different types, e.g. integer, long, double etc, so I tried to write a class which extends AggregateFunction *with generic type* to implement the max function. Then I want to register only one function name for all types. E.g. tableEnv.registerFunction("MYMAX", new MyMax()); instead of tableEnv.registerFunction("MYINTEGERMAX", new MyIntegerMax()); tableEnv.registerFunction("MYLONGMAX", new MyLongMax()); tableEnv.registerFunction("MYDOULBEMAX", new MyDoubleMax()); Is there a way to implement that? I know the build in function "MAX" can apply to all types, so I wonder if I can also implement that. Thanks. -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Re: The way to write a UDF with generic type
Hi Chesnay, Maybe you misunderstand my question. I have below code: public class MyMaxAggregation extends AggregateFunction { @Override public MyAccumulator createAccumulator() { return new MyAccumulator(); } @Override public T getValue(MyAccumulator accumulator) { return null; } static class MyAccumulator { double maxValue; } } But tableEnv.registerFunction("MYMAX", new MyMaxAggregation()); will throw exception as below: Exception in thread "main" org.apache.flink.api.common.functions.InvalidTypesException: Type of TypeVariable 'T' in 'class com.tr.apt.test.MyMaxAggregation' could not be determined. This is most likely a type erasure problem. The type extraction currently supports types with generic variables only in cases where all variables in the return type can be deduced from the input type(s). Otherwise the type has to be specified explicitly using type information. at org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfoWithTypeHierarchy(TypeExtractor.java:882) at org.apache.flink.api.java.typeutils.TypeExtractor.privateCreateTypeInfo(TypeExtractor.java:803) at org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfo(TypeExtractor.java:769) at org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfo(TypeExtractor.java:762) at org.apache.flink.table.api.java.StreamTableEnvironment.registerFunction(StreamTableEnvironment.scala:482) at com.tr.apt.test.StreamingJob.main(StreamingJob.java:52) -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
The way to write a UDF with generic type
Hi Community, I tried to write a UDF with generic type, but seems that Flink will complain not recognizing the type information when I use it in SQL. I checked the implementation of native function "MAX" and realize that it's not using the same API(AggregationFunction e.g.) as user defined function, is that the reason why "MAX" doesn't have the generic type issue? How can I implement my own "MAX" function which could support all types? Thanks. -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Re: Flink SQL client always cost 2 minutes to submit job to a local cluster
Hi Fabian, It's the submission of the jar file cost too long time. And yes Hequn and your suggestion is working, but just curious why a 100M jar files causes so long time to submit, is it related with some upload parameter settings of the web layer? -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Re: Flink SQL client always cost 2 minutes to submit job to a local cluster
I have to do that for now, however I have to find another way because the jar some times get update and the flink cluster will be remotely in future. -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Flink SQL client always cost 2 minutes to submit job to a local cluster
I am using Flink 1.6.1, I tried to use flink sql client with some own jars with --jar and --library. It can work to execute sql query, however it always cause around 2 minutes to submit the job the local cluster, but when I copy my jar to flink lib, and remove --jar and --library parameter, it can submit the job immediately. I debugged and found the 2 minutes is cost by the RestClusterClient to send the request with the jar as payload to the flink cluster. I don't know why it uses 2 minutes to upload the package? Is there a way to work around it? Thanks. -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Re: how to override s3 key config in flink job
It might be difficult as you the task manager and job manager are pre-started in a session mode. It seems that flink http server will always use the configuration that you specified when you start your flink cluster, i.e. start-cluster.sh, I don't find a way to override it. -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Re: how to override s3 key config in flink job
Which flink version are you using. I know how it works in yarn, but not very clear with standalone mode. -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Re: where can I see logs from code
The code running in your main method will output to flink cli log, others like map function etc will output to task manager log. Are you saying that you only see flink code in http://SERVERADD/#/taskmanager/TM_ID/log? It might be useful if you elaborate your environment. -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Re: your advice please regarding state
General approach#1 is ok, but you may have to use some hash based key selector if you have a heavy data skew. -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Re: how to override s3 key config in flink job
Did you try "-Dkey=value"? -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Re: TaskManager & task slots
OK, thanks. -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Re: About the issue caused by flink's dependency jar package submission method
As far as I know, -yt works for both job manager and task manager, -C works for flink cli. Did you consider putting all your jars in /flink/lib? -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Re: TaskManager & task slots
Hi Fabian, Is below description still remain the same in Flink 1.6? Slots do not guard CPU time, IO, or JVM memory. At the moment they only isolate managed memory which is only used for batch processing. For streaming applications their only purpose is to limit the number of parallel threads that can be executed by a TaskManager. -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Re: Always trigger calculation of a tumble window in Flink SQL
I am able to write a single operator as you suggested, thank you. And then I saw ContinuousProcessingTimeTrigger from flink source code, it looks like it's something that I am looking for, if there is a way that I can customize the SQL "TUMBLE" window to use this trigger instead of ProcessingTimeTrigger, then it should solve my problem. Do you know if there is a way to use a customize trigger in the "TUMBLE" window of SQL? -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Re: Always trigger calculation of a tumble window in Flink SQL
Hi Piotr, Thank you for your explanation. I basically understand your meaning, as far as my understanding, we can write a custom window assigner and custom trigger, and we can register the timer when the window process elements. But How can we register a timer when no elements received during a time window? My requirement is to always fire at end of the time window even no result from the sql query. -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Re: Always trigger calculation of a tumble window in Flink SQL
Hi Piotr, Thank you for your explanation. I basically understand your meaning, as far as my understanding, we can write a custom window assigner and custom trigger, and we can register the timer when the window process elements. But How can we register a timer when no elements received during a time window? My requirement is to always fire at end of the time window even no result from the sql query. -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Re: Always trigger calculation of a tumble window in Flink SQL
Hi Piotr, Thank you for your explanation. I basically understand your meaning, as far as my understanding, we can write a custom window assigner and custom trigger, and we can register the timer when the window process elements. But How can we register a timer when no elements received during a time window? My requirement is to always fire at end of the time window even no result from the sql query. -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Re: Always trigger calculation of a tumble window in Flink SQL
Hi Piotr, Thank you for your explanation. I basically understand your meaning, as far as my understanding, we can write a custom window assigner and custom trigger, and we can register the timer when the window process elements. But How can we register a timer when no elements received during a time window? My requirement is to always fire at end of the time window even no result from the sql query. -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Re: Always trigger calculation of a tumble window in Flink SQL
Hi Piotr, Can you elaborate more on the solution with the custom operator? I don't think there will be any records from the SQL query if no input data in coming in within the time window even if we convert the result to a datastream. -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Always trigger calculation of a tumble window in Flink SQL
We have a requirement that always want to trigger a calculation on a timer basis e.g. every 1 minute. *If there are records come in flink during the time window then calculate it with the normal way, i.e. aggregate for each record and getResult() at end of the time window.* *If there are no records come in flink during the time window, then send the last calculated result.* I know that Flink will not trigger the calculation in the second case(when no records come in the system during the time window), if there a solution for me in Flink SQL? -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Re: How to configure TaskManager's JVM options through cmdline?
You can try with *-yD env.java.opts.taskmanager="-XX:+UseConcMarkSweepGC"* if you are running fink on yarn. -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Re: Reverse the order of fields in Flink SQL
Hi Timo, I write simple testing code for the issue, please checkout https://gist.github.com/yinhua-dai/143304464270afd19b6a926531f9acb1 I write a custom table source which just use RowCsvInputformat to create the dataset, and use the provided CsvTableSink, and can reproduce the issue. -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Re: Reverse the order of fields in Flink SQL
I think I have already done that in my custom sink. @Override public String[] getFieldNames() { return this.fieldNames; } @Override public TypeInformation[] getFieldTypes() { return this.fieldTypes; } @Override public TableSink configure(String[] fieldNames, TypeInformation[] fieldTypes) { this.fieldNames = fieldNames; this.fieldTypes = fieldTypes; return this; } -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reverse the order of fields in Flink SQL
I write a customized table source, and it emits some fields let's say f1, f2. And then I just write to a sink with a reversed order of fields, as below: *select f2, f1 from customTableSource* And I found that it actually doesn't do the field reverse. Then I tried with flink provided CsvTableSource and CsvTableSink, I found that it has no problem reverse the order, and after some investigation I found that it's related with two things: 1. *CsvTableSource* implemented *ProjectableTableSource* 2. *RowCsvInputFormat* supports *selectedFields* Do I have to do the two things as well in my custom table source to get the reverse work? -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
How to access Ftp server with passive mode
I am using flink to download and process a big file from a remote ftp server in AWS EMR. As flink supports ftp protocol with hadoop ftp file system, so I use the CSVInputFormat with a ftp address(ftp://user:pass@server/path/file). It works correct in my local machine, but when I run the job in EMR it failed to establish the connection between the EMR node and ftp server. After some investigation I found that we have to use ftp passive mode to access the ftp server in AWS, but by checking the source code of FtpFileSystem, it looks like it always use active mode, I can't find a way to inject and modify the behavior to use passive mode, although FtpClient has method to do that. Do you have any suggestions, thank you. -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Re: User jar is present in the flink job manager's class path
Hi Gary, Yes you are right, we are using the attach mode. I will try to put my jar to flink/lib to get around with the issue. Thanks. I will open a jira for the discrepancy for flink 1.3 and 1.5, thanks a lot. -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Re: User jar is present in the flink job manager's class path
Meanwhile, I can see below code in flink 1.5 public static final ConfigOption CLASSPATH_INCLUDE_USER_JAR = key("yarn.per-job-cluster.include-user-jar") .defaultValue("ORDER") .withDescription("Defines whether user-jars are included in the system class path for per-job-clusters as" + " well as their positioning in the path. They can be positioned at the beginning (\"FIRST\"), at the" + " end (\"LAST\"), or be positioned based on their name (\"ORDER\"). Setting this parameter to" + " \"DISABLED\" causes the jar to be included in the user class path instead."); Does this mean the user jar should always be included in class path? -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Re: User jar is present in the flink job manager's class path
Hi Timo, I didn't tried to configure the classloader order, according to the document, it should only be needed for yarn-session mode, right? I can see the ship files(-yt /path/dir/) is present in job manager's class path, so maybe I should put my uber jar in the -yt path so that it will be shipped and add to class path in flink 1.5? -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
User jar is present in the flink job manager's class path
We have some customized log4j layout implementation so we need flink job manager/task manager be able to load the logger implementation which is packaged in the uber jar. However, we noticed that in flink 1.3, the user jar is put at the beginning of job manager, when we do the same again in flink 1.5, the user jar is not there any more. Is this expected? I saw this is the document: *When submitting a Flink job/application directly to YARN (via bin/flink run -m yarn-cluster ...), dedicated TaskManagers and JobManagers are started for that job. Those JVMs have both Flink framework classes and user code classes in the Java classpath. That means that there is no dynamic classloading involved in that case.* And we are using flink on yarn with per-job mode. So confused by what we experiencing for now. -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
UnsatisfiedLinkError when using flink-s3-fs-hadoop
Hi, I have experience UnsatisfiedLinkError when I tried to use flink-s3-fs-hadoop to sink to s3 in my local Windows machine. I googled and tried several solutions like download hadoop.dll and winutils.exe, set up HADOOP_HOME and PATH environment variables, copy hadoop.dll to C:\Windows\System32, none of them get worked. I also tried to load the hadoop library myself in the code by using "System.loadlibrary("hadoop")", it succeed, but the error still happen, anything additional step I am missing? Attach my code(*the 2nd line is just trying to debug and it will throw exception*): public static void main(String[] args) throws Exception { System.loadLibrary("hadoop"); NativeIO.Windows.access(null, NativeIO.Windows.AccessRight.ACCESS_READ); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); DataStream ds = env.readTextFile("s3://fts-test/test/input.csv"); ds.print(); ds.writeAsText("s3://fts-test/test/output.csv"); FileSystem.initialize(GlobalConfiguration.loadConfiguration("D:\\Technology\\flink\\flink-1.5.3\\conf")); env.execute(); } -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Re: Facing Issues while migrating to S3A
Hi, I am still have the same problem, googled many ways but still failed. I have downloaded and added hadoop.dll and winutils.exe to class path. To verify that is working, I called "System.loadLibrary("haddop")" at the beginning of my java program and it succeed. BTW: I run my program in windows 7 64bit. -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/