Re: Trying to implement UpsertStreamTableSink in Java

2018-07-23 Thread Timo Walther

Hi James,

the method `Table.writeToSink()` calls `configure(String[] fieldNames, 
TypeInformation[] fieldTypes)` internally. Since you return null, you 
are trying to register null instead of a table sink.


I hope this helps.

Regards,
Timo


Am 23.07.18 um 14:33 schrieb Porritt, James:


I put this class together when trying to create my own upsertable 
table sink in Java:


public class MyTableSink implements UpsertStreamTableSink {

@Override

public TableSink> configure(String[] fieldNames, 
TypeInformation[] fieldTypes) {


return null;

}

@Override

public void setKeyFields(String[] keys) {

System.out.println("setKeyFields" + keys);

}

@Override

public void setIsAppendOnly(Boolean isAppendOnly) {

}

@Override

public String[] getFieldNames() {

return new String[0];

}

@Override

public TypeInformation[] getFieldTypes() {

return new TypeInformation[0];

}

@Override

public TypeInformation getRecordType() {

return null;

}

@Override

public TupleTypeInfo> getOutputType() {

return new TupleTypeInfo>();

}

@Override

public void emitDataStream(DataStream> dataStream) {

dataStream.print();

}

}

I try and link it to my StreamTable with:

mystreamtable.writeToSink(new MyTableSink());

For some reason though I’m getting the error:

org.apache.flink.client.program.ProgramInvocationException: The main 
method caused an error.


at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:545)


at 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:420)


at 
org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:404)


at 
org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:781)


at 
org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:275)


at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:210)

at 
org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1020)


at 
org.apache.flink.client.cli.CliFrontend.lambda$main$9(CliFrontend.java:1096)


at java.security.AccessController.doPrivileged(Native Method)

at javax.security.auth.Subject.doAs(Subject.java:422)

at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1754)


at 
org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)


at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1096)

Caused by: org.apache.flink.table.api.TableException: Stream Tables 
can only be emitted by AppendStreamTableSink, RetractStreamTableSink, 
or UpsertStreamTableSink.


at 
org.apache.flink.table.api.StreamTableEnvironment.writeToSink(StreamTableEnvironment.scala:284)


at org.apache.flink.table.api.Table.writeToSink(table.scala:862)

at org.apache.flink.table.api.Table.writeToSink(table.scala:830)

at alphagen_stats.KafkaAlphaGen.main(KafkaAlphaGen.java:265)

at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)


at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)


at java.lang.reflect.Method.invoke(Method.java:498)

   at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:528)


... 12 more

What am I doing wrong?

##
The information contained in this communication is confidential and
intended only for the individual(s) named above. If you are not a named
addressee, please notify the sender immediately and delete this email
from your system and do not disclose the email or any part of it to any
person. The views expressed in this email are the views of the author
and do not necessarily represent the views of Millennium Capital Partners
LLP (MCP LLP) or any of its affiliates. Outgoing and incoming electronic
communications of MCP LLP and its affiliates, including telephone
communications, may be electronically archived and subject to review
and/or disclosure to someone other than the recipient. MCP LLP is
authorized and regulated by the Financial Conduct Authority. Millennium
Capital Partners LLP is a limited liability partnership registered in
England & Wales with number OC312897 and with its registered office at
50 Berkeley Street, London, W1J 8HD.
##





Re: Converting a DataStream into a Table throws error

2018-08-01 Thread Timo Walther

Hi Mich,

I would check you imports again [1]. This is a pure compiler issue that 
is unrelated to your actual data stream. Also check your project 
dependencies.


Regards,
Timo

[1] 
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/common.html#implicit-conversion-for-scala


Am 01.08.18 um 09:30 schrieb Mich Talebzadeh:


Hi both,

I added the import as Hequn suggested.

My stream is very simple and consists of 4 values separated by "," as 
below


05521df6-4ccf-4b2f-b874-eb27d461b305,IBM,2018-07-30T19:51:50,190.48

So this is what I have been trying to do

Code

    val dataStream =  streamExecEnv
   .addSource(new FlinkKafkaConsumer011[String](topicsValue, new 
SimpleStringSchema(), properties))

 //
 //
  val tableEnv = TableEnvironment.getTableEnvironment(streamExecEnv)
  val table1: Table = tableEnv.fromDataStream(dataStream, 'key, 
'ticker, 'timeissued, 'price)


note those four columns in Table1 definition

And this is the error being thrown

[info] Compiling 1 Scala source to 
/home/hduser/dba/bin/flink/md_streaming/target/scala-2.11/classes...
[error] 
/home/hduser/dba/bin/flink/md_streaming/src/main/scala/myPackage/md_streaming.scala:152: 
overloaded method value fromDataStream with alternatives:
[error]   [T](dataStream: 
org.apache.flink.streaming.api.datastream.DataStream[T], fields: 
String)org.apache.flink.table.api.Table 
[error]   [T](dataStream: 
org.apache.flink.streaming.api.datastream.DataStream[T])org.apache.flink.table.api.Table
[error]  cannot be applied to 
(org.apache.flink.streaming.api.datastream.DataStreamSource[String], 
Symbol, Symbol, Symbol, Symbol)
[error]   val table1: Table = tableEnv.fromDataStream(dataStream, 
'key, 'ticker, 'timeissued, 'price)

[error]    ^
[error] one error found
[error] (compile:compileIncremental) Compilation failed

I suspect dataStream may not be compatible with this operation?

Regards,

Dr Mich Talebzadeh

LinkedIn 
/https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw/


http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk.Any and all responsibility for 
any loss, damage or destruction of data or any other property which 
may arise from relying on this email's technical content is explicitly 
disclaimed. The author will in no case be liable for any monetary 
damages arising from such loss, damage or destruction.




On Wed, 1 Aug 2018 at 04:51, Hequn Cheng > wrote:


Hi, Mich

You can try adding "import org.apache.flink.table.api.scala._", so
that the Symbol can be recognized as an Expression.

Best, Hequn

On Wed, Aug 1, 2018 at 6:16 AM, Mich Talebzadeh
mailto:mich.talebza...@gmail.com>> wrote:

Hi,

I am following this example


https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/table/common.html#integration-with-datastream-and-dataset-api

This is my dataStream which is built on a Kafka topic

//
    //Create a Kafka consumer
    //
    val dataStream =  streamExecEnv
   .addSource(new
FlinkKafkaConsumer011[String](topicsValue, new
SimpleStringSchema(), properties))
 //
 //
  val tableEnv =
TableEnvironment.getTableEnvironment(streamExecEnv)
  val table1: Table = tableEnv.fromDataStream(dataStream,
'key, 'ticker, 'timeissued, 'price)

While compiling it throws this error

[error]

/home/hduser/dba/bin/flink/md_streaming/src/main/scala/myPackage/md_streaming.scala:169:
overloaded method value fromDataStream with alternatives:
[error]   [T](dataStream:
org.apache.flink.streaming.api.datastream.DataStream[T],
fields: String)org.apache.flink.table.api.Table 
[error]   [T](dataStream:

org.apache.flink.streaming.api.datastream.DataStream[T])org.apache.flink.table.api.Table
[error]  cannot be applied to
(org.apache.flink.streaming.api.datastream.DataStreamSource[String],
Symbol, Symbol, Symbol, Symbol)
[error]   val table1: Table =
tableEnv.fromDataStream(dataStream, 'key, 'ticker,
'timeissued, 'price)
[error]    ^
[error] one error found
[error] (compile:compileIncremental) Compilation failed

The topic is very simple, it is comma separated prices. I
tried mapFunction and flatMap but neither worked!

Thanks,


Dr Mich Talebzadeh

LinkedIn

/https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw/

http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk.Any and all
responsibility for any loss, damage or destruction of data or
any other property which may arise from relying on this
email's technical content is explicitly disclaimed. The author
 

Re: Converting a DataStream into a Table throws error

2018-08-01 Thread Timo Walther
If these two imports are the only imports that you added, then you did 
not follow Hequn's advice or the link that I sent you.


You need to add the underscore imports to let Scala do its magic.

Timo


Am 01.08.18 um 10:28 schrieb Mich Talebzadeh:

Hi Timo,

These are my two flink table related imports

import org.apache.flink.table.api.Table
import org.apache.flink.table.api.TableEnvironment

And these are my dependencies building with SBT

libraryDependencies += "org.apache.hadoop" % "hadoop-core" % "1.2.1"
libraryDependencies += "org.apache.hbase" % "hbase" % "1.2.6"
libraryDependencies += "org.apache.hbase" % "hbase-client" % "1.2.6"
libraryDependencies += "org.apache.hbase" % "hbase-common" % "1.2.6"
libraryDependencies += "org.apache.hbase" % "hbase-server" % "1.2.6"
libraryDependencies += "org.apache.flink" %% 
"flink-connector-kafka-0.11" % "1.5.0"
libraryDependencies += "org.apache.flink" %% 
"flink-connector-kafka-base" % "1.5.0"

libraryDependencies += "org.apache.flink" %% "flink-scala" % "1.5.0"
libraryDependencies += "org.apache.kafka" % "kafka-clients" % "0.11.0.0"
libraryDependencies += "org.apache.flink" %% "flink-streaming-java" % 
"1.5.0" % "provided"
*libraryDependencies += "org.apache.flink" %% "flink-table" % "1.5.0" 
% "provided"

*libraryDependencies += "org.apache.kafka" %% "kafka" % "0.11.0.0"

There appears to be conflict somewhere that cause this error

[info] Compiling 1 Scala source to 
/home/hduser/dba/bin/flink/md_streaming/target/scala-2.11/classes...
[error] 
/home/hduser/dba/bin/flink/md_streaming/src/main/scala/myPackage/md_streaming.scala:152: 
overloaded method value fromDataStream with alternatives:
[error]   [T](dataStream: 
org.apache.flink.streaming.api.datastream.DataStream[T], fields: 
String)org.apache.flink.table.api.Table 
[error]   [T](dataStream: 
org.apache.flink.streaming.api.datastream.DataStream[T])org.apache.flink.table.api.Table
[error]  cannot be applied to 
(org.apache.flink.streaming.api.datastream.DataStreamSource[String], 
Symbol, Symbol, Symbol, Symbol)
[error]   val table1: Table = tableEnv.fromDataStream(dataStream, 
'key, 'ticker, 'timeissued, 'price)

[error]    ^
[error] one error found
[error] (compile:compileIncremental) Compilation failed

Thanks


Dr Mich Talebzadeh

LinkedIn 
/https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw/


http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk.Any and all responsibility for 
any loss, damage or destruction of data or any other property which 
may arise from relying on this email's technical content is explicitly 
disclaimed. The author will in no case be liable for any monetary 
damages arising from such loss, damage or destruction.




On Wed, 1 Aug 2018 at 09:17, Timo Walther <mailto:twal...@apache.org>> wrote:


Hi Mich,

I would check you imports again [1]. This is a pure compiler issue
that is unrelated to your actual data stream. Also check your
project dependencies.

Regards,
Timo

[1]

https://ci.apache.org/projects/flink/flink-docs-master/dev/table/common.html#implicit-conversion-for-scala

Am 01.08.18 um 09:30 schrieb Mich Talebzadeh:


Hi both,

I added the import as Hequn suggested.

My stream is very simple and consists of 4 values separated by
"," as below

05521df6-4ccf-4b2f-b874-eb27d461b305,IBM,2018-07-30T19:51:50,190.48

So this is what I have been trying to do

Code

val dataStream =  streamExecEnv
   .addSource(new FlinkKafkaConsumer011[String](topicsValue,
new SimpleStringSchema(), properties))
 //
 //
  val tableEnv = TableEnvironment.getTableEnvironment(streamExecEnv)
  val table1: Table = tableEnv.fromDataStream(dataStream, 'key,
'ticker, 'timeissued, 'price)

note those four columns in Table1 definition

And this is the error being thrown

[info] Compiling 1 Scala source to
/home/hduser/dba/bin/flink/md_streaming/target/scala-2.11/classes...
[error]

/home/hduser/dba/bin/flink/md_streaming/src/main/scala/myPackage/md_streaming.scala:152:
overloaded method value fromDataStream with alternatives:
[error]   [T](dataStream:
org.apache.flink.streaming.api.datastream.DataStream[T], fields:
String)org.apache.flink.table.api.Table 
[error]   [T](dataStream:

org.apache.flink.streaming.api.datastream.DataStream[T])org.apache.flink.table.api.Table
[error]  cannot be applied to
(org.apache.flink.streaming.api

Re: Converting a DataStream into a Table throws error

2018-08-02 Thread Timo Walther

Whenever you use Scala and there is a Scala specific class use it.

remove: import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment

add: import org.apache.flink.streaming.api.scala._

This will use 
org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.


Timo

Am 02.08.18 um 09:47 schrieb Mich Talebzadeh:

Tremendous. Many thanks.

Put the sbt build file and the Scala code here

https://github.com/michTalebzadeh/Flink

Regards,

Dr Mich Talebzadeh

LinkedIn 
/https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw/


http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk.Any and all responsibility for 
any loss, damage or destruction of data or any other property which 
may arise from relying on this email's technical content is explicitly 
disclaimed. The author will in no case be liable for any monetary 
damages arising from such loss, damage or destruction.




On Thu, 2 Aug 2018 at 08:27, Timo Walther <mailto:twal...@apache.org>> wrote:


Hi Mich,

could you share your project with us (maybe on github)? Then we
can import it and debug what the problem is.

Regards,
Timo

Am 02.08.18 um 07:37 schrieb Mich Talebzadeh:

Hi Jorn,

Here you go the dependencies

libraryDependencies += "org.apache.hadoop" % "hadoop-core" % "1.2.1"
libraryDependencies += "org.apache.hbase" % "hbase" % "1.2.6"
libraryDependencies += "org.apache.hbase" % "hbase-client" % "1.2.6"
libraryDependencies += "org.apache.hbase" % "hbase-common" % "1.2.6"
libraryDependencies += "org.apache.hbase" % "hbase-server" % "1.2.6"
libraryDependencies += "org.apache.flink" %%
"flink-connector-kafka-0.11" % "1.5.0"
libraryDependencies += "org.apache.flink" %%
"flink-connector-kafka-base" % "1.5.0"
libraryDependencies += "org.apache.flink" %% "flink-scala" % "1.5.0"
libraryDependencies += "org.apache.kafka" % "kafka-clients" %
"0.11.0.0"
libraryDependencies += "org.apache.flink" %%
"flink-streaming-scala" % "1.5.0"
libraryDependencies += "org.apache.flink" %% "flink-table" %
"1.5.0" % "provided"
libraryDependencies += "org.apache.kafka" %% "kafka" % "0.11.0.0"

Thanks

Dr Mich Talebzadeh

LinkedIn

/https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw/

http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk.Any and all responsibility
for any loss, damage or destruction of data or any other property
which may arise from relying on this email's technical content is
explicitly disclaimed. The author will in no case be liable for
any monetary damages arising from such loss, damage or destruction.



On Thu, 2 Aug 2018 at 06:19, Jörn Franke mailto:jornfra...@gmail.com>> wrote:


How does your build.sbt looks especially dependencies?
On 2. Aug 2018, at 00:44, Mich Talebzadeh
mailto:mich.talebza...@gmail.com>> wrote:


Changed as suggested

   val streamExecEnv =
StreamExecutionEnvironment.getExecutionEnvironment
     val dataStream =  streamExecEnv
   .addSource(new
FlinkKafkaConsumer011[String](topicsValue, new
SimpleStringSchema(), properties))
  val tableEnv =
TableEnvironment.getTableEnvironment(streamExecEnv)
tableEnv.registerDataStream("table1", streamExecEnv, 'key,
'ticker, 'timeissued, 'price)

Still the same error

[info] Compiling 1 Scala source to
/home/hduser/dba/bin/flink/md_streaming/target/scala-2.11/classes...
[error]

/home/hduser/dba/bin/flink/md_streaming/src/main/scala/myPackage/md_streaming.scala:139:
overloaded method value registerDataStream with alternatives:
[error]   [T](name: String, dataStream:
org.apache.flink.streaming.api.datastream.DataStream[T],
fields: String)Unit 
[error]   [T](name: String, dataStream:
org.apache.flink.streaming.api.datastream.DataStream[T])Unit
[error]  cannot be applied to (String,
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment,
Symbol, Symbol, Symbol, Symbol)
[error] tableEnv.registerDataStream("table1", streamExecEnv,
'key, 'ticker, 'timeissued, 'price)
[error]    ^
[error] one error found
[error] (compile:compileIncremental) Compilation failed
[error] Total time: 3 s,

Re: Late events in streaming using SQL API

2018-08-02 Thread Timo Walther

Hi Juan,

currently, there is no way of handling late events in SQL. This feature 
got requested multiple times so it is likely that some contributor will 
pick it up soon. I filed FLINK-10031 [1] for it. There is also [2] that 
aims for improving the situation with time windows.


Regards,
Timo

[1] https://issues.apache.org/jira/browse/FLINK-10031
[2] https://issues.apache.org/jira/browse/FLINK-6969
Am 02.08.18 um 14:36 schrieb Juan Gentile:


Hello,

We are using the SQL api and we were wondering if it’s possible to 
capture and log late events. We could not find a way considering the 
time window is managed inside the SQL.


Is there a way to do this?

Thank you,

Juan





Re: Sink Multiple Stream Elastic search

2018-08-02 Thread Timo Walther

Hi,

I'm not aware that multiple Flink operators can share transport 
connections. They usually perform independent communication with the 
target system. If the pressure is too high for Elasticsearch, have you 
thought about reducing the parallelism of the sink. Also the buffering 
options could help [1].


Regards,
Timo

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/connectors/elasticsearch.html#configuring-the-internal-bulk-processor


Am 02.08.18 um 13:51 schrieb shashank734:

Hello,

I am using Elastic search5 Connector. Can I use same connection while
sinking multiple streams to Elastic search? Currently, I think it creates a
different transport connection for each sink. I think it's creating a lot of
connections with the cluster. Cause I am sinking 5-6 streams.




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/





Re: FlinkCEP and scientific papers ?

2018-08-07 Thread Timo Walther

Hi Esa,

the SQL/CEP integration might be part of Flink 1.7. The discussion has 
just been started again [1].


Regards,
Timo

[1] https://issues.apache.org/jira/browse/FLINK-6935

Am 07.08.18 um 15:36 schrieb Esa Heikkinen:


There was one good example of pattern query in the paper made by SASE+ 
language (in attachment).


Could you easily say how to do that FlickCEP with Scala ? Or is it 
possible ?


That SQL and CEP would also be very interesting, but when it is ready 
to use ?


BR Esa

*From:*vino yang 
*Sent:* Monday, July 23, 2018 3:00 PM
*To:* Esa Heikkinen 
*Cc:* Till Rohrmann ; Chesnay Schepler 
; user 

*Subject:* Re: FlinkCEP and scientific papers ?

Hi Esa,

I think the core implementation pattern is still based that paper, 
there is a package named "nfa"[1] contains the main thought.


The latest CEP module added more features and enhanced the old versio. 
What's more, there is a FLIP-20 which has been accepted, it described 
how to integrate with SQL and CEP.[2]


I think there is no newer paper related to current Flink CEP.

[1]: 
https://github.com/apache/flink/tree/master/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa


[2]: 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-20:+Integration+of+SQL+and+CEP


Thanks, vino.

2018-07-23 16:05 GMT+08:00 Esa Heikkinen >:


Hi

Thank you. This was very good paper for me J

How much current FlinkCEP works like this (the paper was written
2008) ?

Are there exist newer papers related to current FlinkCEP ?

BR Esa

*From:*Till Rohrmann mailto:trohrm...@apache.org>>
*Sent:* Wednesday, July 18, 2018 9:38 AM
*To:* vino yang mailto:yanghua1...@gmail.com>>
*Cc:* Esa Heikkinen mailto:esa.heikki...@student.tut.fi>>; Chesnay Schepler
mailto:ches...@apache.org>>; user
mailto:user@flink.apache.org>>
*Subject:* Re: FlinkCEP and scientific papers ?

You are right Vino,

the initial implementation was based on the above mentioned paper.

Cheers,

Till

On Tue, Jul 17, 2018 at 5:34 PM vino yang mailto:yanghua1...@gmail.com>> wrote:

Hi Esa,

AFAIK, the earlier Flink CEP refers to the Paper 《Efficient
Pattern Matching over Event Streams》[1]. Flink absorbed  two
major idea from this paper:

1. NFA-b model on event stream

2. a shared versioned match buffer which is a optimized data
structure

To Till and Chesnay:

Did I missed anything when as time goes on and the development
of Flink? If yes, please give your additional remarks.

[1]:
https://people.cs.umass.edu/~yanlei/publications/sase-sigmod08.pdf


Thanks, vino.

2018-07-17 22:01 GMT+08:00 Esa Heikkinen
mailto:esa.heikki...@student.tut.fi>>:

Hi

I don’t know this the correct forum to ask, but are there
exist some good scientific papers about FlinkCEP (Complex
Event Processing) ?

I know Flink is based to Stratosphere, but how is it
FlinkCEP ?

BR Esa





Re: unsubscribtion

2018-08-07 Thread Timo Walther

Hi,

see https://flink.apache.org/community.html#mailing-lists for unsubscribing:

Use:

user-unsubscr...@flink.apache.org

Regards,
Timo



Am 08.08.18 um 08:18 schrieb 네이버:



On 7 Aug 2018, at 19:42, Yan Zhou [FDS Science] > wrote:



Thank you Vino. It is very helpful.


*From:* vino yang mailto:yanghua1...@gmail.com>>
*Sent:* Tuesday, August 7, 2018 7:22:50 PM
*To:* Yan Zhou [FDS Science]
*Cc:* user
*Subject:* Re: checkpoint recovery behavior when kafka source is set 
to start from timestamp

Hi Yan Zhou:

I think the java doc of the setStartFromTimestamp method has been 
explained very clearly, posted here:


/*/***/
/** Specify the consumer to start reading partitions from a specified 
timestamp.*/

/** The specified timestamp must be before the current timestamp.*/
/** This lets the consumer ignore any committed group offsets in 
Zookeeper / Kafka brokers.*/

/***/
/** The consumer will look up the earliest offset whose timestamp 
is greater than or equal*/
/** to the specific timestamp from Kafka. If there's no such offset, 
the consumer will use the*/

/** latest offset to read data from kafka.*/
/***/
/** This method does not affect where partitions are read from 
when the consumer is restored*/
/** from a checkpoint or savepoint. When the consumer is restored 
from a checkpoint or*/

/** savepoint, only the offsets in the restored state will be used.*/
/***/
/** @param startupOffsetsTimestamp timestamp for the startup offsets, 
as milliseconds from epoch.*/

/***/
/** @return The consumer object, to allow function chaining.*/
/**/*/

Thanks, vino.

Yan Zhou [FDS Science] mailto:yz...@coupang.com>> 
于2018年8月8日周三 上午9:06写道:


Hi Experts,


In my application, the kafka source is set to start from a
specified timestamp, by calling method
FlinkKafkaConsumer010#setStartFromTimestamp(long
startupOffsetsTimestamp).


If the application have run a while and then recover from a
checkpoint because of failure, what's the offset will the kafka
source to read from? I suppose it will read from the offset that
has been committed before the failure. Is it right?


I am going to verify it, however some clarification is good in
case my test result doesn't meet my assumption.


Best

Yan






Re: Table API, custom window

2018-08-09 Thread Timo Walther

Hi Oleksandr,

currenlty, we don't support custom windows for Table API. The Table & 
SQL API try to solve the most common cases but for more specific logic 
we recommend the DataStream API.


Regards,
Timo

Am 09.08.18 um 14:15 schrieb Oleksandr Nitavskyi:


Hello guys,

I am curious, is there a way to define custom window 
(assigners/trigger/evictor) for Table/Sql Flink API? Looks like 
documentation keep silence about this, but is there are plans for it? 
Or should we go with DataStream API in case we need such kind of 
functionality?


Thanks

Oleksandr Nitavskyi





Re: Getting compilation error in Array[TypeInformation]

2018-08-09 Thread Timo Walther

Hi Mich,

I strongly recommend to read a good Scala programming tutorial before 
writing on a mailing list.


As the error indicates you are missing generic parameters. If you don't 
know the parameter use `Array[TypeInformation[_]]` or `TableSink[_]`. 
For the types class you need to import the types class 
"org.apache.flink.table.api.Types".


Regards,
Timo


Am 09.08.18 um 17:18 schrieb Mich Talebzadeh:

This is the code in Scala

   val tableEnv = TableEnvironment.getTableEnvironment(streamExecEnv)
tableEnv.registerDataStream("priceTable", splitStream, 'key, 'ticker, 
'timeissued, 'price)
    val result = tableEnv.scan("priceTable").filter('ticker === "VOD" 
&& 'price > 99.0).select('key, 'ticker, 'timeissued, 'price)


    val fieldNames: Array[String] = Array("key", "ticker", 
"timeissued", "price")
    val fieldTypes: Array[TypeInformation] = Array(Types.STRING, 
Types.STRING, Types.STRING, Types.Float)
    val sink: TableSink = new CsvTableSink(writeDirectory+fileName, 
fieldDelim = ",")
    tableEnv.registerTableSink("CsvSinkTable", fieldNames, fieldTypes, 
sink)

    result.insertInto("CsvSinkTable")

When compiling I get the following error

[error] 
/home/hduser/dba/bin/flink/md_streaming/src/main/scala/myPackage/md_streaming.scala:171: 
class TypeInformation takes type parameters
[error] val fieldTypes: Array[TypeInformation] = 
Array(Types.STRING, Types.STRING, Types.STRING, Types.Float)

[error]   ^
[error] 
/home/hduser/dba/bin/flink/md_streaming/src/main/scala/myPackage/md_streaming.scala:171: 
not found: value Types
[error] val fieldTypes: Array[TypeInformation] = 
Array(Types.STRING, Types.STRING, Types.STRING, Types.Float)

[error]    ^
[error] 
/home/hduser/dba/bin/flink/md_streaming/src/main/scala/myPackage/md_streaming.scala:171: 
not found: value Types
[error] val fieldTypes: Array[TypeInformation] = 
Array(Types.STRING, Types.STRING, Types.STRING, Types.Float)

[error] ^
[error] 
/home/hduser/dba/bin/flink/md_streaming/src/main/scala/myPackage/md_streaming.scala:171: 
not found: value Types
[error] val fieldTypes: Array[TypeInformation] = 
Array(Types.STRING, Types.STRING, Types.STRING, Types.Float)

[error] ^
[error] 
/home/hduser/dba/bin/flink/md_streaming/src/main/scala/myPackage/md_streaming.scala:171: 
not found: value Types
[error] val fieldTypes: Array[TypeInformation] = 
Array(Types.STRING, Types.STRING, Types.STRING, Types.Float)

[error] ^
[error] 
/home/hduser/dba/bin/flink/md_streaming/src/main/scala/myPackage/md_streaming.scala:172: 
trait TableSink takes type parameters
[error] val sink: TableSink = new 
CsvTableSink(writeDirectory+fileName, fieldDelim = ",")

[error]   ^
[error] 6 errors found

May be I am not importing the correct dependencies.

Thanks

Dr Mich Talebzadeh

LinkedIn 
/https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw/


http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk.Any and all responsibility for 
any loss, damage or destruction of data or any other property which 
may arise from relying on this email's technical content is explicitly 
disclaimed. The author will in no case be liable for any monetary 
damages arising from such loss, damage or destruction.






Re: SQL parallelism setting

2018-08-10 Thread Timo Walther

Hi,

currenlty, you can only set the parallelism for an entire Flink job 
using env.setParallelism().


There are rough ideas of how we could improve the situation in the 
future to control the parallelism of individual operators but this might 
need one or two releases.


Regards,
Timo

Am 10.08.18 um 08:54 schrieb Shu Li Zheng:

Hi community,

Is there a way to change parallelism on sqlQuery()?

Regards,

Shu li Zheng





Re: Stream collector serialization performance

2018-08-15 Thread Timo Walther

Hi Mingliang,

first of all the POJO serializer is not very performant. Tuple or Row 
are better. If you want to improve the performance of a collect() 
between operators, you could also enable object reuse. You can read more 
about this here [1] (section "Issue 2: Object Reuse"), but make sure 
your implementation is correct because an operator could modify the 
objects of follwing operators.


I hope this helps.

Regards,
Timo

[1] 
https://data-artisans.com/blog/curious-case-broken-benchmark-revisiting-apache-flink-vs-databricks-runtime



Am 15.08.18 um 09:06 schrieb 祁明良:

Hi all,

I’m currently using the keyed process function, I see there’s serialization 
happening when I collect the object / update the object to rocksdb. For me the 
performance of serialization seems to be the bottleneck.
By default, POJO serializer is used, and the timecost of collect / update to 
rocksdb is roughly 1:1, Then I switch to kryo by setting 
getConfig.enableForceKryo(). Now the timecost of update to rocksdb decreases 
significantly to roughly 0.3, but the collect method seems not improving. Can 
someone help to explain this?

  My Object looks somehow like this:

Class A {
String f1 // 20 * string fields
List f2. // 20 * list of another POJO object
Int f3 // 20 * ints fields
}
Class B {
String f // 5 * string fields
}

Best,
Mingliang

本邮件及其附件含有小红书公司的保密信息,仅限于发送给以上收件人或群组。禁止任何其他人以任何形式使用(包括但不限于全部或部分地泄露、复制、或散发)本邮件中的信息。如果您错收了本邮件,请您立即电话或邮件通知发件人并删除本邮件!
This communication may contain privileged or other confidential information of 
Red. If you have received it in error, please advise the sender by reply e-mail 
and immediately delete the message and any attachments without copying or 
disclosing the contents. Thank you.





Re: Scala 2.12 Support

2018-08-16 Thread Timo Walther

Hi Aaron,

we just released Flink 1.6 and the discussion for the roadmap of 1.7 
will begin soon. I guess the Jira issue will also updated then. I would 
recommend to watch it for now.


Regards,
Timo


Am 16.08.18 um 17:08 schrieb Aaron Levin:

Hi Piotr,

Thanks for the update. Glad to hear it's high on the priority list! 
I'm looking forward to the 1.7 update!


It may be worth having someone more official from the Flink team give 
an update on that ticket. It wasn't clear if the 1.7 comment from that 
user was just a reference to the fact that 1.6 had come out (or where 
they got that information). I know a few people have cited the ticket 
and concluded "not clear what's going on with Scala 2.12 support." If 
you have the bandwidth, a note from you or anyone else would be helpful!


Thanks again!

Best,

Aaron Levin

On Thu, Aug 16, 2018 at 6:04 AM, Piotr Nowojski 
mailto:pi...@data-artisans.com>> wrote:


Hi,

Scala 2.12 support is high on our priority list and we hope to
have it included for the 1.7 release (as you can see in the ticket
itself), which should happen later this year.

Piotrek



On 15 Aug 2018, at 17:59, Aaron Levin mailto:aaronle...@stripe.com>> wrote:

Hello!

I'm wondering if there is anywhere I can see Flink's roadmap for
Scala 2.12 support. The last email I can find on the list for
this was back in January, and the FLINK-7811[0], the ticket
asking for Scala 2.12 support, hasn't been updated in a few months.

Recently Spark fixed the ClosureCleaner code to support Scala
2.12[1], and from what I can gather this was one of the main
barrier for Flink supporting Scala 2.12. Given this has been
fixed, is there work in progress to support Scala 2.12? Any
updates on FLINK-7811?

Thanks for your help!

[0] https://issues.apache.org/jira/browse/FLINK-7811

[1] https://issues.apache.org/jira/browse/SPARK-14540


Best,

Aaron Levin







Re: InvalidTypesException: Type of TypeVariable 'K' in 'class X' could not be determined

2018-08-17 Thread Timo Walther

Hi Miguel,

the issue that you are observing is due to Java's type erasure.

"new MyClass()" is always erasured to "new MyClass()" by the 
Java compiler so it is impossible for Flink to extract something.


For classes in declarations like

class MyClass extends ... {
   ...
}

the compiler adds the actual generic and Flink can extract it. So for 
classes the generics remain but generics passed to objects are erasured.


Regards,
Timo

Am 16.08.18 um 22:28 schrieb Miguel Coimbra:

Hello,

I have some code which compiles correctly (Flink 1.4) under Java 8.
It uses generic types.
While it compiles correctly, the execution fails with the error:

org.apache.flink.api.common.functions.InvalidTypesException: Type of 
TypeVariable 'K' in 'class X' could not be determined.


This is my main:

public static void main(final String[] args) {
 X x = new X();
}

This is my class X:

public class X{
public X() { TypeInformation keySelector = TypeInformation.of(new 
TypeHint(){}); } }


Perhaps I'm lacking knowledge on the way Java's generics work, but why 
can't Flink determine the TypeVariable of 'K'?
As I am instantiating X parameterized as a Long, that information 
should eventually reach Flink and the constructor of X would be 
equivalent to this:


public X() { TypeInformation keySelector = 
TypeInformation.of(new TypeHint(){}); }

During execution, however, this error pops up.
What am I missing here, and what is the best way to achieve this 
generic behavior in a Flink-idiomatic way?


Thank you very much for your time.





Re: Override CaseClassSerializer with custom serializer

2018-08-17 Thread Timo Walther

Hi Gerard,

you are correct, Kryo serializers are only used when no built-in Flink 
serializer is available.


Actually, the tuple and case class serializers are one of the most 
performant serializers in Flink (due to their fixed length, no null 
support). If you really want to reduce the serialization overhead you 
could look into the object reuse mode. We had this topic on the mailing 
list recently, I will just copy it here:


If you want to improve the performance of a collect() between operators, 
you could also enable object reuse. You can read more about this here 
[1] (section "Issue 2: Object Reuse"), but make sure your implementation 
is correct because an operator could modify the objects of follwing 
operators.


I hope this helps.

Regards,
Timo

[1] 
https://data-artisans.com/blog/curious-case-broken-benchmark-revisiting-apache-flink-vs-databricks-runtime


Am 17.08.18 um 17:29 schrieb gerardg:

Hello,

I can't seem to be able to override the CaseClassSerializer with my custom
serializer. I'm using env.getConfig.addDefaultKryoSerializer() to add the
custom serializer but I don't see it being used. I guess it is because it
only uses Kryo based serializers if it can't find a Flink serializer?

Is then worth it to replace the CaseClassSerializer with a custom
serializer? (when I profile the CaseClassSerializer.(de)serialize method
appears as the most used so I wanted to give it a try) If so, how can I do
it?

Thanks,

Gerard



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/





Re: lack of function and low usability of provided function

2018-08-23 Thread Timo Walther

Hi Henry,

thanks for giving feedback. The set of built-in functions is a continous 
effort that will never be considered as "done". If you think a function 
should be supported, you can open issues in FLINK-6810 and we can 
discuss its priority.


Flink is an open source project so feel also free to contribute either 
by opening issues, reviewing existing PRs, or code.


Regards,
Timo


Am 23.08.18 um 10:01 schrieb vino yang:

Hi Henry,

I recently submitted some PRs about Scalar functions, some of which 
have been merged and some are being reviewed, and some may be what you 
need.


Log2(x) :https://issues.apache.org/jira/browse/FLINK-9928 will be 
released in Flink 1.7
exp(x): exists here 
https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/sql.html
regular express support: use similar to , also see here 
https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/sql.html
regex extract : https://issues.apache.org/jira/browse/FLINK-9990 
reviewing

regex replace : https://issues.apache.org/jira/browse/FLINK-9991 reviewing

The status of most scalar functions can be seen here : 
https://issues.apache.org/jira/browse/FLINK-6810


Thanks, vino.


徐涛 mailto:happydexu...@gmail.com>> 
于2018年8月23日周四 下午3:16写道:


Hi All,
        I found flink is lack of some basic functions , for
example string split, regular express support, json parse and
extract support, these function are used frequently in development
, but they are not supported, use has to write UDF to support this.
        And some of the provided functions are lack of usability,
for example log(2, 1.0) and exp(1.0)  with double params are not
supported. I think they are not hard to implement and they are
very basic functions.
        Will flink enhance the basic functions , maybe in later
releases?

Best,
Henry





Re: AvroSchemaConverter and Tuple classes

2018-08-24 Thread Timo Walther

Hi,

tuples are just a sub category of rows. Because the tuple arity is 
limited to 25 fields. I think the easiest solution would be to write 
your own converter that maps rows to tuples if you know that you will 
not need more than 25 fields. Otherwise it might be easier to just use a 
TextInputFormat and do the parsing yourself with a library.


Regards,
Timo


Am 23.08.18 um 18:54 schrieb françois lacombe:

Hi all,

I'm looking for best practices regarding Tuple instances creation.

I have a TypeInformation object produced by 
AvroSchemaConverter.convertToTypeInfo("{...}");
Is this possible to define a corresponding Tuple instance with it? 
(get the T from the TypeInformation)


Example :
{
  "type": "record",
  "fields": [
    { "name": "field1", "type": "int" },
    { "name": "field2", "type": "string"}
]}
 = Tuple2

The same question rises with DataSet or other any record handling 
class with parametrized types.


My goal is to parse several CsvFiles with different structures 
described in an Avro schema.
It would be great to not hard-code structures in my Java code and only 
get types information at runtime from Avro schemas


Is this possible?

Thanks in advance

François Lacombe





Re: withFormat(Csv) is undefined for the type BatchTableEnvironment

2018-08-30 Thread Timo Walther

Hi François,

you should read the documentation from top to bottom. The overview part 
[1] explains how everything plays together with examples.


Regards,
Timo

[1] 
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connect.html#overview


Am 30.08.18 um 10:41 schrieb Till Rohrmann:

Hi François,

as Vino said, the BatchTableEnvironment does not provide a 
`withFormat` method. Admittedly, the documentation does not state it 
too explicitly but you can only call the `withFormat` method on a 
table connector as indicated here [1]. If you think that you need to 
get the data from somewhere first before defining a format, then it 
becomes clear that you first need to define a connector.


[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/table/connect.html#table-formats


Cheers,
Till

On Thu, Aug 30, 2018 at 4:46 AM vino yang > wrote:


Hi francois,

Maybe you can refer to the comments of this source code?[1]


https://github.com/apache/flink/blob/master/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala#L143

Thanks, vino.

françois lacombe mailto:francois.laco...@dcbrain.com>> 于2018年8月29日周三
下午10:54写道:

Hi Vino,

Thanks for this answer.
I can't find in the docs where it's about BatchTableDescriptor

https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/table/connect.html#csv-format

It sounds like the withFormat method is applied on
TableEnvironment object on this page.

All the best

François

2018-08-28 4:37 GMT+02:00 vino yang mailto:yanghua1...@gmail.com>>:

Hi Francois,

Yes, the withFormat API comes from an instance of
BatchTableDescriptor, and the BatchTableDescriptor
instance is returned by the connect API, so you should
call BatchTableEnvironment#connect first.

Thanks, vino.

françois lacombe mailto:francois.laco...@dcbrain.com>> 于2018年8月27日周一
下午10:26写道:

Hi all,

I'm currently trying to load a CSV file content with
Flink 1.6.0 table API.
This error is raised as a try to execute the code
written in docs

https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/table/connect.html#csv-format

ExecutionEnvironment env =
ExecutionEnvironment.getExecutionEnvironment();
BatchTableEnvironment tEnv =
TableEnvironment.getTableEnvironment(env);
tEnv.withFormat(new Csv(...));

> Exception in thread "main" java.lang.Error:
Unresolved compilation problem:
   The method withFormat(Csv) is undefined for the
type BatchTableEnvironment

Am I wrong?

Thanks in advance for any hint

François






Re: ElasticSearch 6 - error with UpdateRequest

2018-08-30 Thread Timo Walther

Hi,

thanks for your feedback. I agree that the the current interfaces are 
not flexible enough to fit to every use case. The unified connector API 
is a a very recent feature that still needs some polishing. I'm working 
on a design document to improve the situation there.


For now, you can simply implement some utitilty method that just 
iterates over column names and types of TableSchema and calls 
`schema.field(name, type)`


I hope this helps.

Regards,
Timo


Am 31.08.18 um 08:10 schrieb Averell:

Good day everyone,

I tried to send UpdateRequest(s) to ElasticSearch6, and I got the following
error:

Caused by: java.lang.NoSuchMethodError:
org.elasticsearch.action.bulk.BulkProcessor.add(Lorg/elasticsearch/action/ActionRequest;)Lorg/elasticsearch/action/bulk/BulkProcessor;
at
org.apache.flink.streaming.connectors.elasticsearch.BulkProcessorIndexer.add(BulkProcessorIndexer.java:76)

Below is my ElasticsearchSinkFunction:

import org.elasticsearch.action.update.UpdateRequest
def upsertRequest(element: T): UpdateRequest = {
new UpdateRequest(
"myIndex",
"record",
s"${element.id}")
.doc(element.toMap())
}
override def process(element: T, runtimeContext: RuntimeContext,
requestIndexer: RequestIndexer): Unit = {
requestIndexer.add(upsertRequest(element))
}

What could be the issue here?

Thanks for your help.

Regards,
Averell



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/





Re: Table API: get Schema from TableSchema, TypeInformation[] or Avro

2018-08-30 Thread Timo Walther

Hi,

thanks for your feedback. I agree that the the current interfaces are 
not flexible enough to fit to every use case. The unified connector API 
is a a very recent feature that still needs some polishing. I'm working 
on a design document to improve the situation there.


For now, you can simply implement some utitilty method that just 
iterates over column names and types of TableSchema and calls 
`schema.field(name, type)`


I hope this helps.

Regards,
Timo

Am 31.08.18 um 07:40 schrieb françois lacombe:

Hi all,

Today I'm looking into derivating an Avro schema json string into a 
Schema object.
In the overview of 
https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/table/connect.html 
Avro is used as a format and never as a schema.


This was a topic in JIRA-9813
I can get a TableSchema with TableSchema schema = 
TableSchema.fromTypeInfo(AvroSchemaConverter.convertToTypeInfo(sch_csv.toString())); 
but I can't use it with BatchTableDescriptor.withSchema().


How can I get a Schema from TableSchema, TypeInformation[] or even 
Avro json string?
A little bridge is missing between TableSchema and 
org.apache.flink.table.descriptors.Schema it seems.


Thanks in advance for any useful hint

François





Re: ElasticSearch 6 - error with UpdateRequest

2018-08-30 Thread Timo Walther

Hi Averell,

sorry for my wrong other mail.

I also observed this issue when implementing FLINK-3875. Currently, 
update requests are broken due to a binary incompatibility. I already 
have a fix for this in a different branch. I opened FLINK-10269 [1] to 
track the issue.


As a work around you can simply copy 
org.apache.flink.streaming.connectors.elasticsearch.BulkProcessorIndexer 
to your project. This should ensure that the class is compiled 
correctly. If it doesn't help, please let us know.


Regards,
Timo

[1] https://issues.apache.org/jira/browse/FLINK-10269

Am 31.08.18 um 08:10 schrieb Averell:

Good day everyone,

I tried to send UpdateRequest(s) to ElasticSearch6, and I got the following
error:

Caused by: java.lang.NoSuchMethodError:
org.elasticsearch.action.bulk.BulkProcessor.add(Lorg/elasticsearch/action/ActionRequest;)Lorg/elasticsearch/action/bulk/BulkProcessor;
at
org.apache.flink.streaming.connectors.elasticsearch.BulkProcessorIndexer.add(BulkProcessorIndexer.java:76)

Below is my ElasticsearchSinkFunction:

import org.elasticsearch.action.update.UpdateRequest
def upsertRequest(element: T): UpdateRequest = {
new UpdateRequest(
"myIndex",
"record",
s"${element.id}")
.doc(element.toMap())
}
override def process(element: T, runtimeContext: RuntimeContext,
requestIndexer: RequestIndexer): Unit = {
requestIndexer.add(upsertRequest(element))
}

What could be the issue here?

Thanks for your help.

Regards,
Averell



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/





Re: Table API: get Schema from TableSchema, TypeInformation[] or Avro

2018-08-31 Thread Timo Walther
Thanks for your response. I think we won't need this utility in the near 
future. As mentioned, I'm working on a design document that allows for 
better abstraction. I think I will publish it next week.


Regards,
Timo


Am 31.08.18 um 08:36 schrieb françois lacombe:

Hi Timo

Yes it helps, thank you.
I'll start building such an utility method. Are you interested to get 
the source?


According to mapping here : 
https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/table/connect.html#apache-avro-format
Is there any way to get corresponding TypeInformation of an Avro 
type or should I hard code a List>?


All the best

François

2018-08-31 8:12 GMT+02:00 Timo Walther <mailto:twal...@apache.org>>:


Hi,

thanks for your feedback. I agree that the the current interfaces
are not flexible enough to fit to every use case. The unified
connector API is a a very recent feature that still needs some
polishing. I'm working on a design document to improve the
situation there.

For now, you can simply implement some utitilty method that just
iterates over column names and types of TableSchema and calls
`schema.field(name, type)`

I hope this helps.

Regards,
Timo

Am 31.08.18 um 07:40 schrieb françois lacombe:

Hi all,

Today I'm looking into derivating an Avro schema json string
into a Schema object.
In the overview of

https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/table/connect.html

<https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/table/connect.html>
Avro is used as a format and never as a schema.

This was a topic in JIRA-9813
I can get a TableSchema with TableSchema schema =

TableSchema.fromTypeInfo(AvroSchemaConverter.convertToTypeInfo(sch_csv.toString()));
but I can't use it with BatchTableDescriptor.withSchema().

How can I get a Schema from TableSchema, TypeInformation[]
or even Avro json string?
A little bridge is missing between TableSchema and
org.apache.flink.table.descriptors.Schema it seems.

Thanks in advance for any useful hint

François








Re: ElasticSearch 6 - error with UpdateRequest

2018-08-31 Thread Timo Walther
The problem is that BulkProcessorIndexer is located in 
flink-connector-elasticsearch-base which is compiled against a very old 
ES version. This old version is source code compatible but apparently 
not binary compatible with newer Elasticsearch classes. By copying this 
class you force to compile the class against ES 6 and don't use the old 
class in the base module.


The fix will include to improve the API call bridge. As done here [1].

Regards,
Timo

[1] https://github.com/apache/flink/pull/6611


Am 31.08.18 um 09:06 schrieb Averell:

Hi Timo,

Thanks for your help. I don't get that error anymore after putting that file
into my project.
However, I don't understand how it could help. I have been using the Flink
binary built on my same laptop, then how could it be different between
having that java class in Flink project vs in my project?
If you have some spare time, please help explain.

I also would like to know the other way to fix that issue (that you
implemented in your branch).

Thanks a lot for your help.
Regards,
Averell




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/





Re: Orc Sink Table

2018-09-13 Thread Timo Walther

Hi Jose,

you have to add additional Maven modules depending on the 
connector/format you want to use. See this page [1] for more information.


Feel free to ask further questions if the description is not enough for you.

Regards,
Timo

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/table/connect.html#further-tablesources-and-tablesinks



Am 13.09.18 um 11:50 schrieb jose farfan:

Hi

I am checking the documentation

https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/table/common.html#register-a-tablesink


  Register a TableSink

A registered |TableSink| can be used to emit the result of a Table API 
or SQL query 
 to 
an external storage system, such as a database, key-value store, 
message queue, or file system (in different encodings, e.g., CSV, 
Apache [Parquet, Avro, ORC], …).


Flink aims to provide TableSinks for common data formats and storage 
systems. Please see the documentation about Table Sources and Sinks 
 page 
for details about available sinks and instructions for how to 
implement a custom |TableSink|.



You can read that we can define different encodigos CSV, ORC, etc.


But in the source code, I can only find CsvTableFlink

How I can get a OrcTableFlink? Do I need to extend the TableSinkBase, 
of there is another place to find that implementation



BR

Jose






Re: Conversion to relational algebra failed to preserve datatypes

2018-09-14 Thread Timo Walther

Hi,

could you maybe post the query that caused the exception? I guess the 
exception is related to a time attribute [1] for the optimizer time 
attributes and timestamps make no difference however they have a 
slightly different data type that might have caused the error. I think 
is a bug that should be fixed, once we have more context.


Regards,
Timo


[1] 
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming.html#time-attributes



Am 14.09.18 um 10:49 schrieb Yuan,Youjun:


Hi,

I am getting the following error while submitting job to a cluster, 
which seems failed to compare 2 RelDateTypes, though they seems 
identical (from the error message), and everything is OK if I run it 
locally.


I guess calcite failed to compare the first field named *ts*, of type 
*TIMESTAMP(3)*, because:


  * If I don’t select ts, then everything goes fine
  * If I cast ts to other type, like SELECT cast(ts AS TIMESTAMP),
then everything is fine
  * If I switch to EventTime, the issue also goes away. Currently it’s
ProcessTime

I am using Flink 1.4, and submitting job to a standalone cluster.

Below are the error:



Caused by: org.apache.flink.client.program.ProgramInvocationException: 
The program caused an error:


at 
org.apache.flink.client.program.OptimizerPlanEnvironment.getOptimizedPlan(OptimizerPlanEnvironment.java:93) 



at 
org.apache.flink.client.program.ClusterClient.getOptimizedPlan(ClusterClient.java:334) 



at 
org.apache.flink.runtime.webmonitor.handlers.JarActionHandler.getJobGraphAndClassLoader(JarActionHandler.java:76) 



at 
org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$handleJsonRequest$0(JarRunHandler.java:69) 
... 9 more


Caused by: java.lang.AssertionError: Conversion to relational algebra 
failed to preserve datatypes:


validated type:

RecordType(TIMESTAMP(3) NOT NULL ts, VARCHAR(65536) CHARACTER SET 
"UTF-16LE" COLLATE "ISO-8859-1$en_US$primary" userId, VARCHAR(65536) 
CHARACTER SET "UTF-16LE" COLLATE "ISO-8859-1$en_US$primary" batchId, 
VARCHAR(65536) CHARACTER SET "UTF-16LE" COLLATE 
"ISO-8859-1$en_US$primary" projectId, VARCHAR(65536) CHARACTER SET 
"UTF-16LE" COLLATE "ISO-8859-1$en_US$primary" vehicleId, CHAR(5) 
CHARACTER SET "UTF-16LE" COLLATE "ISO-8859-1$en_US$primary" NOT NULL 
field0, VARCHAR(65536) CHARACTER SET "UTF-16LE" COLLATE 
"ISO-8859-1$en_US$primary" rule0, DOUBLE threshold0, DOUBLE 
field_value0, VARCHAR(65536) CHARACTER SET "UTF-16LE" COLLATE 
"ISO-8859-1$en_US$primary" alarmId) NOT NULL


converted type:

RecordType(TIMESTAMP(3) NOT NULL ts, VARCHAR(65536) CHARACTER SET 
"UTF-16LE" COLLATE "ISO-8859-1$en_US$primary" userId, VARCHAR(65536) 
CHARACTER SET "UTF-16LE" COLLATE "ISO-8859-1$en_US$primary" batchId, 
VARCHAR(65536) CHARACTER SET "UTF-16LE" COLLATE 
"ISO-8859-1$en_US$primary" projectId, VARCHAR(65536) CHARACTER SET 
"UTF-16LE" COLLATE "ISO-8859-1$en_US$primary" vehicleId, CHAR(5) 
CHARACTER SET "UTF-16LE" COLLATE "ISO-8859-1$en_US$primary" NOT NULL 
field0, VARCHAR(65536) CHARACTER SET "UTF-16LE" COLLATE 
"ISO-8859-1$en_US$primary" rule0, DOUBLE threshold0, DOUBLE 
field_value0, VARCHAR(65536) CHARACTER SET "UTF-16LE" COLLATE 
"ISO-8859-1$en_US$primary" alarmId) NOT NULL


...

at 
org.apache.calcite.sql2rel.SqlToRelConverter.checkConvertedType(SqlToRelConverter.java:451) 



at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:567) 



at 
org.apache.flink.table.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:106)




thanks in advance,

youjun





Re: Could not find a suitable table factory for 'org.apache.flink.table.factories.DeserializationSchemaFactory' in the classpath.

2018-09-26 Thread Timo Walther

Hi,

actually it should not be neccessary to put the flink-json format into 
/lib. Is the class `org.apache.flink.formats.json.JsonRowFormatFactory` 
present in the jar file you are creating with Maven? There should also 
be an entry in 
`META_INF/services/org.apache.flink.table.factories.TableFactory` for 
this class.


Thanks for reporting this.

Regards,
Timo


Am 26.09.18 um 06:25 schrieb clay:

hi Till:

I have solve the problem,
this reason is the flink-json which is add to pom didn't work

must copy the flink-json-xxx.jar to flink path ./lib/
...



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/





Re: OpenSSL use in Flink

2018-09-26 Thread Timo Walther

Hi Suchithra,

did you take a look at the documentation [1] about the SSL setup?

Regards,
Timo

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.6/ops/security-ssl.html


Am 26.09.18 um 14:08 schrieb V N, Suchithra (Nokia - IN/Bangalore):


Hello,

I have a query regarding OpenSSL usage in Flink. Please let me know if 
Flink uses OpenSSL and SSL_CTX API’s.


Thanks,

Suchithra





Re: About the retract of the calculation result of flink sql

2018-10-01 Thread Timo Walther

Hi,

you also need to keep the parallelism in mind. If your downstream 
operator or sink has a parallelism of 1 and your SQL query pipeline has 
a higher parallelism, the retract results are rebalanced and arrive in a 
wrong order. For example, if you view the changelog in SQL Client, the 
built-in SQL Client sink has always parallelism 1.


Regards,
Timo



Am 29.09.18 um 17:02 schrieb Hequn Cheng:

Hi clay,

Are there any other lines after the last line in your picture? The 
final result should be eventual consistency and correct.


In your sql, there is a left join, a keyed group by and a non-keyed 
group by. Both of the left join and keyed group by will send 
retractions to the downstream non-keyed group by once there is an 
update. The retraction messages vibrate the result value. However, the 
final result will be correct.
To get monotonous results, you can add another non-keyed group by with 
max.


Best, Hequn.


On Sat, Sep 29, 2018 at 3:47 PM clay > wrote:


My final calculation result is implemented in the following way
when writing
to kafka, because KafkaTableSink does not support retract mode, I
am not
sure whether this method will affect the calculation result.

val userTest: Table = tEnv.sqlQuery(sql)

val endStream = tEnv.toRetractStream[Row](userTest)

//userTest.insertInto("kafkaSink")

val myProducer = new FlinkKafkaProducer011[String](
  kafkaBrokers,         // broker list
  topic,   // target topic
  new SimpleStringSchema)   // serialization schema

endStream.map(x=>{
  s"${x._1}:${x._2.toString}"
}).addSink(myProducer)



--
Sent from:
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/





Re: Does Flink SQL "in" operation has length limit?

2018-10-01 Thread Timo Walther

Hi,

tuple should not be used anywhere in flink-table. @Rong can you point us 
to the corresponding code? I haven't looked into the code but we should 
definitely support this query. @Henry feel free to open an issue for it.


Regards,
Timo


Am 28.09.18 um 19:14 schrieb Rong Rong:

Yes.

Thanks for bringing this up Hequn! :-) I think Tuple would not be the 
best container to use.


However, in search for alternative, shouldn't Collection / List be a 
more suitable solution? Row seems to not fit in the context (as there 
can be Rows with elements of different type).
I vaguely recall there was similar JIRA but might not be related to IN 
clause. Let me try to dig it up.


--
Rong

On Fri, Sep 28, 2018 at 9:32 AM Hequn Cheng > wrote:


Hi,

I haven't look into the code. If this is limited by Tuple, would
it better to implement it with Row?

Best, Hequn

On Fri, Sep 28, 2018 at 9:27 PM Rong Rong mailto:walter...@gmail.com>> wrote:

Hi Henry, Vino.

I think IN operator was translated into either a RexSubQuery
or a SqlStdOperatorTable.IN operator.
I think Vino was referring to the first case.
For the second case (I think that's what you are facing here),
they are converted into tuples and the maximum we currently
have in Flink was Tuple25.java, I was wondering if that was
the issue you are facing. You can probably split the IN into
many IN combining with OR.

--
Rong

On Fri, Sep 28, 2018 at 2:33 AM vino yang
mailto:yanghua1...@gmail.com>> wrote:

Hi Henry,

Maybe the number of elements in your IN clause is out of
range? Its default value is 20, you can modify it with
this configuration item:

/*withInSubQueryThreshold(XXX)*/

This API comes from Calcite.

Thanks, vino.

徐涛 mailto:happydexu...@gmail.com>> 于2018年9月28日周五
下午4:23写道:

Hi,

 When I am executing the following SQL in flink 1.6.1, some 
error throws out saying that it has a support issue, but when I reduce the 
number of integers in the “in” sentence, for example,

trackId in (124427150,71648998) , Flink does not
complain anything, so I wonder is there any length
limit in “in”operation?

Thanks a lot.

SELECT
 trackId as id,track_title as description, count(*) as cnt
FROM
 play
WHERE
 appName='play.statistics.trace' and
 trackId in 
(124427150,71648998,124493327,524043,27300837,30300481,27300809,124744768,45982512,124526566,124556427,124804208,74302264,119588973,30496269,27300288,124098818,125071530,120918746,124171456,30413034,124888075,125270551,125434224,27300195,45982342,45982468,45982355,65349883,124705962,65349905,124298305,124889583,45982338,20506255,18556415,122161128,27299018,122850375,124862362,45982336,59613202,122991190,124590280,124867563,45982332,124515944,20506257,122572115,92083574)
GROUP BY
 HOP(started_at_ts, INTERVAL '5' SECOND, INTERVAL '5' 
MINUTE),trackId,track_title;



FlinkLogicalWindowAggregate(group=[{1, 2}], cnt=[COUNT()])
FlinkLogicalCalc(expr#0..3=[{inputs}],
started_at_ts=[$t2], trackId=[$t0], track_title=[$t1])
    FlinkLogicalJoin(condition=[=($0, $3)],
joinType=[inner])
FlinkLogicalCalc(expr#0..4=[{inputs}],
expr#5=[_UTF-16LE'play.statistics.trace'],
expr#6=[=($t0, $t5)], trackId=[$t1],
track_title=[$t2], started_at_ts=[$t4], $condition=[$t6])
FlinkLogicalNativeTableScan(table=[[play]])
      FlinkLogicalValues(tuples=[[{ 124427150 }, {
71648998 }, { 124493327 }, { 524043 }, { 27300837 }, {
30300481 }, { 27300809 }, { 124744768 }, { 45982512 },
{ 124526566 }, { 124556427 }, { 124804208 }, {
74302264 }, { 119588973 }, { 30496269 }, { 27300288 },
{ 124098818 }, { 125071530 }, { 120918746 }, {
124171456 }, { 30413034 }, { 124888075 }, { 125270551
}, { 125434224 }, { 27300195 }, { 45982342 }, {
45982468 }, { 45982355 }, { 65349883 }, { 124705962 },
{ 65349905 }, { 124298305 }, { 124889583 }, { 45982338
}, { 20506255 }, { 18556415 }, { 122161128 }, {
27299018 }, { 122850375 }, { 124862362 }, { 45982336
}, { 59613202 }, { 122991190 }, { 124590280 }, {
124867563 }, { 45982332 }, { 124515944 }, { 20506257
}, { 122572115 }, { 92083574 }]])

This exception indicates that the query uses an
u

Re: Rowtime for Table from DataStream without explixit fieldnames

2018-10-04 Thread Timo Walther

Hi Johannes,

this is not supported so far. You could write a little helper method 
like the following:


val s:Seq[Expression] = 
Types.of[WC].asInstanceOf[CaseClassTypeInfo[WC]].fieldNames.map(Symbol(_).toExpr)

val s2:Seq[Expression] = s :+'rowtime.rowtime

tEnv.fromDataSet(input, s2: _*)


Not a very nice solution, but it should work.

Regards,
Timo

Am 04.10.18 um 15:40 schrieb Dawid Wysakowicz:

Hi Johannes,

I am afraid that this is currently not possible and indeed you have to
pass all fields again, but Timo cced might want to correct me if I am wrong.

Best,

Dawid


On 04/10/18 15:08, Johannes Schulte wrote:

Hi,

when converting a DataStream (with Watermarks) to a table like
described here

https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/table/streaming.html#event-time

I wonder on how to use the rowtime in a following window operation
_without_ explicitly specifying all field names and hence rely on case
class type inference.

Currently when operating on a stream of events

case class Event(field1: String, ts: long)

val ds: DataStream[Event] = ...

I have to do

tableEnv.fromDataStream(ds, 'field1, 'ts, 'myRowtime.rowtime)

to do

.window(Tumble over 1.hours on 'myRowtime  as 'w)

afterwards. Is there a way to create the TimeAttribute column without
specifiying all fields again?

Thanks for yout help,

Johannes






Re: [DISCUSS] Integrate Flink SQL well with Hive ecosystem

2018-10-11 Thread Timo Walther

Hi Xuefu,

thanks for your proposal, it is a nice summary. Here are my thoughts to 
your list:


1. I think this is also on our current mid-term roadmap. Flink lacks a 
poper catalog support for a very long time. Before we can connect 
catalogs we need to define how to map all the information from a catalog 
to Flink's representation. This is why the work on the unified connector 
API [1] is going on for quite some time as it is the first approach to 
discuss and represent the pure characteristics of connectors.
2. It would be helpful to figure out what is missing in [1] to to ensure 
this point. I guess we will need a new design document just for a proper 
Hive catalog integration.
3. This is already work in progress. ORC has been merged, Parquet is on 
its way [1].
4. This should be easy. There was a PR in past that I reviewed but was 
not maintained anymore.
5. The type system of Flink SQL is very flexible. Only UNION type is 
missing.
6. A Flink SQL DDL is on the roadmap soon once we are done with [1]. 
Support for Hive syntax also needs cooperation with Apache Calcite.

7-11. Long-term goals.

I would also propose to start with a smaller scope where also current 
Flink SQL users can profit: 1, 2, 5, 3. This would allow to grow the 
Flink SQL ecosystem. After that we can aim to be fully compatible 
including syntax and UDFs (4, 6 etc.). Once the core is ready, we can 
work on the tooling (7, 8, 9) and performance (10, 11).


@Jörn: Yes, we should not have a tight dependency on Hive. It should be 
treated as one "connector" system out of many.


Thanks,
Timo

[1] 
https://docs.google.com/document/d/1Yaxp1UJUFW-peGLt8EIidwKIZEWrrA-pznWLuvaH39Y/edit?ts=5bb62df4#

[2] https://github.com/apache/flink/pull/6483

Am 11.10.18 um 07:54 schrieb Jörn Franke:

Would it maybe make sense to provide Flink as an engine on Hive 
(„flink-on-Hive“)? Eg to address 4,5,6,8,9,10. this could be more loosely 
coupled than integrating hive in all possible flink core modules and thus 
introducing a very tight dependency to Hive in the core.
1,2,3 could be achieved via a connector based on the Flink Table API.
Just as a proposal to start this Endeavour as independent projects (hive 
engine, connector) to avoid too tight coupling with Flink. Maybe in a more 
distant future if the Hive integration is heavily demanded one could then 
integrate it more tightly if needed.

What is meant by 11?

Am 11.10.2018 um 05:01 schrieb Zhang, Xuefu :

Hi Fabian/Vno,

Thank you very much for your encouragement inquiry. Sorry that I didn't see 
Fabian's email until I read Vino's response just now. (Somehow Fabian's went to 
the spam folder.)

My proposal contains long-term and short-terms goals. Nevertheless, the effort 
will focus on the following areas, including Fabian's list:

1. Hive metastore connectivity - This covers both read/write access, which 
means Flink can make full use of Hive's metastore as its catalog (at least for 
the batch but can extend for streaming as well).
2. Metadata compatibility - Objects (databases, tables, partitions, etc) 
created by Hive can be understood by Flink and the reverse direction is true 
also.
3. Data compatibility - Similar to #2, data produced by Hive can be consumed by 
Flink and vise versa.
4. Support Hive UDFs - For all Hive's native udfs, Flink either provides its 
own implementation or make Hive's implementation work in Flink. Further, for 
user created UDFs in Hive, Flink SQL should provide a mechanism allowing user 
to import them into Flink without any code change required.
5. Data types -  Flink SQL should support all data types that are available in 
Hive.
6. SQL Language - Flink SQL should support SQL standard (such as SQL2003) with 
extension to support Hive's syntax and language features, around DDL, DML, and 
SELECT queries.
7.  SQL CLI - this is currently developing in Flink but more effort is needed.
8. Server - provide a server that's compatible with Hive's HiverServer2 in 
thrift APIs, such that HiveServer2 users can reuse their existing client (such 
as beeline) but connect to Flink's thrift server instead.
9. JDBC/ODBC drivers - Flink may provide its own JDBC/ODBC drivers for other 
application to use to connect to its thrift server
10. Support other user's customizations in Hive, such as Hive Serdes, storage 
handlers, etc.
11. Better task failure tolerance and task scheduling at Flink runtime.

As you can see, achieving all those requires significant effort and across all 
layers in Flink. However, a short-term goal could  include only core areas 
(such as 1, 2, 4, 5, 6, 7) or start  at a smaller scope (such as #3, #6).

Please share your further thoughts. If we generally agree that this is the 
right direction, I could come up with a formal proposal quickly and then we can 
follow up with broader discussions.

Thanks,
Xuefu



--
Sender:vino yang 
Sent at:2018 Oct 11 (Thu) 09:45
Recipient:Fabian Hueske 
Cc:dev ; Xu

Re: User jar is present in the flink job manager's class path

2018-10-11 Thread Timo Walther

Hi,

did you try to change the classloading strategy? Maybe this problem 
could be fixed by configuring the ClassLoader resolution order [1].


Regards,
Timo

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.5/monitoring/debugging_classloading.html


Am 11.10.18 um 10:49 schrieb yinhua.dai:

We have some customized log4j layout implementation so we need flink job
manager/task manager be able to load the logger implementation which is
packaged in the uber jar.

However, we noticed that in flink 1.3, the user jar is put at the beginning
of job manager, when we do the same again in flink 1.5, the user jar is not
there any more.
Is this expected?

I saw this is the document:
*When submitting a Flink job/application directly to YARN (via bin/flink run
-m yarn-cluster ...), dedicated TaskManagers and JobManagers are started for
that job. Those JVMs have both Flink framework classes and user code classes
in the Java classpath. That means that there is no dynamic classloading
involved in that case.*

And we are using flink on yarn with per-job mode.
So confused by what we experiencing for now.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/





Re: User jar is present in the flink job manager's class path

2018-10-11 Thread Timo Walther
Yes, you are right. I was not aware that the resolution order depends on 
the cluster deployment. I will loop in Gary (in CC) that might know 
about such a YARN setup.


Regards,
Timo

Am 11.10.18 um 15:47 schrieb yinhua.dai:

Hi Timo,

I didn't tried to configure the classloader order, according to the
document, it should only be needed for yarn-session mode, right?

I can see the ship files(-yt /path/dir/) is present in job manager's class
path, so maybe I should put my uber jar in the -yt path so that it will be
shipped and add to class path in flink 1.5?



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/





Re: Flink Table API and table name

2018-10-16 Thread Timo Walther

Hi Flavio,

yes you are right, I don't see a reason why we should not support such 
table names. Feel free to open an issue for it.


Regards,
Timo


Am 16.10.18 um 10:56 schrieb miki haiat:
Im not sure if it will solve this issue but can you try to register 
the your catalog [1]


1.https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/table/common.html#register-an-external-catalog


On Tue, Oct 16, 2018 at 11:40 AM Flavio Pompermaier 
mailto:pomperma...@okkam.it>> wrote:


Hi to all,
in my job I'm trying to read a dataset whose name/id starts with a
number.
It seems that when using the Table API to read that dataset, if
the name starts with a number it is a problem..am I wrong?  I
can't find anything about table id constraints on the
documentation and it seems that it's not possible to escape the
name..for the moment I've added a 'T' in front of the name in
order to have something like T1 or T2 but it looks like a
workaround to me..

Best,
Flavio





Re: Reverse the order of fields in Flink SQL

2018-10-23 Thread Timo Walther

Hi Yinhua,

your custom sink must implement 
`org.apache.flink.table.sinks.TableSink#configure`. This method is 
called when writing to a sink such that the sink can configure itself 
for the reverse order. The methods `getFieldTypes` and `getFieldNames` 
must then return the reconfigured schema; must match to the query result 
schema.


Regards,
Timo


Am 23.10.18 um 09:47 schrieb yinhua.dai:

I write a customized table source, and it emits some fields let's say f1, f2.

And then I just write to a sink with a reversed order of fields, as below:
*select f2, f1 from customTableSource*

And I found that it actually doesn't do the field reverse.


Then I tried with flink provided CsvTableSource and CsvTableSink, I found
that it has no problem reverse the order, and after some investigation I
found that it's related with two things:
1. *CsvTableSource* implemented *ProjectableTableSource*
2. *RowCsvInputFormat* supports *selectedFields*

Do I have to do the two things as well in my custom table source to get the
reverse work?



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/





Re: Java Table API and external catalog bug?

2018-10-25 Thread Timo Walther

Hi Flavio,

the external catalog support is not feature complete yet. I think you 
can only specify the catalog when reading from a table but `insertInto` 
does not consider the catalog name.


Regards,
TImo


Am 25.10.18 um 10:04 schrieb Flavio Pompermaier:

Any other help here? is this a bug or something wrong in my code?

On Tue, Oct 23, 2018 at 9:02 AM Flavio Pompermaier 
mailto:pomperma...@okkam.it>> wrote:


I've tried with t2, test.t2 and test.test.t2.

On Mon, 22 Oct 2018, 19:26 Zhang, Xuefu, mailto:xuef...@alibaba-inc.com>> wrote:

Have you tried "t2" instead of "test.t2"? There is a
possibility that catalog name isn't part of the table name in
the table API.

Thanks,
Xuefu

--
Sender:Flavio Pompermaier mailto:pomperma...@okkam.it>>
Sent at:2018 Oct 22 (Mon) 23:06
Recipient:user mailto:user@flink.apache.org>>
Subject:Java Table API and external catalog bug?

Hi to all,
I've tried to register an external catalog and use it with
the Table API in Flink 1.6.1.
The following (Java) test job cannot write to a sink using
insertInto because Flink cannot find the table by id
(test.t2). Am I doing something wrong or is this a bug?

This is my Java test class:

import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.core.fs.FileSystem.WriteMode;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.java.BatchTableEnvironment;
import org.apache.flink.table.catalog.ExternalCatalogTable;
import org.apache.flink.table.catalog.InMemoryExternalCatalog;
import org.apache.flink.table.descriptors.Csv;
import org.apache.flink.table.descriptors.FileSystem;
import org.apache.flink.table.descriptors.FormatDescriptor;
import org.apache.flink.table.descriptors.Schema;
import org.apache.flink.table.sinks.CsvTableSink;

public class CatalogExperiment {
  public static void main(String[] args) throws Exception {
    // create an external catalog
    final String outPath = "file:/tmp/file2.txt";
    InMemoryExternalCatalog catalog = new
InMemoryExternalCatalog("test");
    FileSystem connDescIn = new
FileSystem().path("file:/tmp/file-test.txt");
    FileSystem connDescOut = new FileSystem().path(outPath);
    FormatDescriptor csvDesc = new Csv()//
        .field("a", "string")//
        .field("b", "string")//
        .field("c", "string")//
        .fieldDelimiter("\t");
    Schema schemaDesc = new Schema()//
        .field("a", "string")//
        .field("b", "string")//
        .field("c", "string");
    ExternalCatalogTable t1 =
ExternalCatalogTable.builder(connDescIn)//
        .withFormat(csvDesc)//
.withSchema(schemaDesc)//
        .asTableSource();
    ExternalCatalogTable t2 =
ExternalCatalogTable.builder(connDescOut)//
        .withFormat(csvDesc)//
.withSchema(schemaDesc)//
        .asTableSink();
    catalog.createTable("t1", t1, true);
    catalog.createTable("t2", t2, true);

    final  ExecutionEnvironment env =
ExecutionEnvironment.getExecutionEnvironment();
    final BatchTableEnvironment btEnv =
TableEnvironment.getTableEnvironment(env);
btEnv.registerExternalCatalog("test", catalog);
    // this does not work
---
    btEnv.scan("test", "t1").insertInto("test.t2");
//ERROR: No table was registered under the name test.t2
    // this works ---
    btEnv.scan("test", "t1").writeToSink(new
CsvTableSink(outPath, "\t", 1, WriteMode.OVERWRITE));
    env.execute();
  }
}


Best,
Flavio






Re: Needed more information about jdbc sink in Flink SQL Client

2018-10-29 Thread Timo Walther

Hi,

all supported connectors and formats for the SQL Client with YAML can be 
found in the connect section [1]. However, the JDBC sink is not 
available for the SQL Client so far. It still needs to be ported, see [2].


However, if you want to use it. You could implement your own table 
factory that creates an instance of the sink. This is documented here [3].


I hope this helps.

Regards,
Timo


[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/table/connect.html
[2] 
https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/table/connect.html#further-tablesources-and-tablesinks
[3] 
https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/table/sourceSinks.html#define-a-tablefactory


Am 29.10.18 um 14:55 schrieb Narayanan, Surat:

Flink team,

I am exploring the Flink SQL client and trying to configure JDBC Sink 
in YAML

I only find some sample YAML configuration in the documentation.
https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/table/sqlClient.html
Where can I find the entire definitions for that YAML configuration?

 tables:
  - name: SinkTable
    type: sink
    update-mode: append
    connector:
      type: jdbc
      driverName: org.apache.derby.jdbc.EmbeddedDriver
      dbURL: jdbc:oracle:thin:@host:port/serviceName
      username: dbuser
      password: 



Regards,
N.Surat





Re: Wired behavior of DATE_FORMAT UDF in Flink SQL

2018-10-31 Thread Timo Walther

Hi Henry,

the DATE_FORMAT function is in a very bad state right now. I would 
recommend to implement your own custom function right now.


This issue is tracked here: 
https://issues.apache.org/jira/browse/FLINK-10032


Regards,
Timo


Am 31.10.18 um 07:44 schrieb 徐涛:

Hi Experts,
I found that DATE_FORMAT(timestamp,format) returns a TIMESTAMP type, 
it is wired, because normally format result should be a string type.
In document it says “Formats timestamp as a string using a specified 
format string”. But when I run it in Flink SQL,

select
 ALBUM_ID,
 UNLOCK_USER_ID,
 DATE_FORMAT(CREATE_TIME, '%Y-%m-%d') as unlock_date
 from share_album_unlock;

The error throws out:
org.apache.flink.table.api.ValidationException: Field types of query 
result and registered TableSink mysqlSink do not match.

Query result schema: [ALBUM_ID: Long, unlock_date: Timestamp, score: Long]
TableSink schema:    [ALBUM_ID: Long, time_window: String, score: Long]

The flink version is 1.6.2.
Is it a program bug or something else?

Best
Henry





Re: Non deterministic result with Table API SQL

2018-10-31 Thread Timo Walther

Hi Flavio,

do you execute this query in a batch or stream execution environment?

In any case this sounds very strange to me. But is it guarateed that it 
is not the fault of the connector?


Regars,
Timo


Am 31.10.18 um 14:54 schrieb Flavio Pompermaier:

Hi to all,
I'm using Flink 1.6.1 and I get different results when running the 
same query on the same static dataset. There are times that I get a 
'NaN' as result of a select field-expression, while other times I get 
a valid double. How is this possible?
This seems to happen only when I execute a complex query while it does 
not  happen when I isolate the 2 select clause causing the error (i.e.:

*SELECT*
ROUND(STDDEV_POP(CAST(imposteResidue AS DOUBLE)),2),
SUM(CASE WHEN field1 IS NULL THEN 1 ELSE 0 END)
*FROM* T1)

Best,
Flavio





Re: Non deterministic result with Table API SQL

2018-10-31 Thread Timo Walther
As far as I know STDDEV_POP is translated into basic aggregate functions 
(SUM/AVG/COUNT). But if this error is reproducible in a little test 
case, we should definitely track this in JIRA.



Am 31.10.18 um 16:43 schrieb Flavio Pompermaier:
Adding more rows to the dataset lead to a deterministic error. My 
tests says that the problem arise when adding the STDDEV_POP to the 
query..
Do you think it could be possible that there's a concurrency problem 
in its implementation?






Re: Flink SQL questions

2018-11-01 Thread Timo Walther
Usually, the problem occurs when users import the wrong classes. The 
current class naming is a bit confusing as there are 3 
StreamTableEnvironment classes. You need to choose the one that matches 
your programming language. E.g. 
org.apache.flink.table.api.java.StreamTableEnvironment.


Regards,
Timo

Am 02.11.18 um 04:32 schrieb Michael Latta:

Thanks, I will check it out.

Michael

Sent from my iPad

On Nov 1, 2018, at 8:22 PM, Hequn Cheng > wrote:



Hi Michael,

There are some test cases in Flink git, such as[1] which I think may 
help you.


Best, Hequn
[1] 
https://github.com/apache/flink/blob/master/flink-libraries/flink-table/src/test/java/org/apache/flink/table/runtime/stream/sql/JavaSqlITCase.java



On Fri, Nov 2, 2018 at 7:46 AM TechnoMage > wrote:


I am trying to get a minimal Flink SQL program going to test it
out.  I have a StreamExecutionEnvironment and from that created a
StreamTableEnvironment.  The docs indicate there should be a
fromDataStream method to create a Table, but none appears to
exist according to Eclipse.  The method registerDataStream is
also missing, just registerDataSteramInternal.  The Internal
suggests private API to me, so I am asking if the docs are out of
date, or I am missing some library or such.  I am using Java 8.0
not Scala.

Michael Latta





Re: Why dont't have a csv formatter for kafka table source

2018-11-02 Thread Timo Walther

I already answered his question but forgot to CC the mailing list:

Hi Jocean,

a standard compliant CSV format for a Kafka table source is in the 
making right now. There is a PR that implements it [1] but it needs 
another review pass. It is high on my priority list and I hope we can 
finalize it after the 1.7 release is out. Feel free to help here by 
reviewing and trying it out.


Regards,
Timo

[1] https://github.com/apache/flink/pull/6541


Am 02.11.18 um 10:11 schrieb Till Rohrmann:

Hi Jocean,

these kind of issues should go to the user mailing list. I've cross 
posted it there and put dev to bcc.


Cheers,
Till

On Fri, Nov 2, 2018 at 6:43 AM Jocean shi > wrote:


Hi all,
     I have  encountered a error When i want to register a table
from kafka
using csv formatter.
     The error is "Could not find a suitable table factory for
'org.apache.flink.table.factories.DeserializationSchemaFactory"

Jocean





Re: Understanding checkpoint behavior

2018-11-12 Thread Timo Walther

Hi,

do you observe such long checkpoint times also without performing 
external calls? If not, I guess the communication to the external system 
is flaky.


Maybe you have to rethink how you perform such calls in order to make 
the pipeline more robust against these latencies. Flink also offers an 
async operator [1] for exactly such cases, this could be worth a look.


Regards,
Timo

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/stream/asyncio.html



Am 05.11.18 um 18:52 schrieb PranjalChauhan:

Hi,

I am new Fink user and currently, using Flink 1.2.1 version. I am trying to
understand how checkpoints actually work when Window operator is processing
events.

My pipeline has the following flow where each operator's parallelism is 1.
source -> flatmap -> tumbling window -> sink
In this pipeline, I had configured the window to be evaluated every 1 hour
(3600 seconds) and the checkpoint interval was 5 mins. The checkpoint
timeout was set to 1 hour as I wanted the checkpoints to complete.

In my window function, the job makes https call to another service so window
function may take some time to evaluate/process all events.

Please refer the following image. In this case, the window was triggered at
23:00:00. Checkpoint 12 was triggered soon after that and I notice that
checkpoint 12 takes long time to complete (compared to other checkpoints
when window function is not processing events).


Following images shows checkpoint 12 details of window & sink operators.



I see that the time spent for checkpoint was actually just 5 ms & 8 ms
(checkpoint duration sync) for window & sink operators. However, End to End
Duration for checkpoint was 11m 12s for both window & sink operator.

Is this expected behavior? If yes, do you have any suggestion to reduce the
end to end checkpoint duration?

Please let me know if any more information is needed.

Thanks.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/





Re: Report failed job submission

2018-11-12 Thread Timo Walther

Hi Flavio,

I'm not entirely sure if I get your question correct but what you are 
looking for is more information (like categorization) why the submission 
failed right?


Regards,
Timo


Am 06.11.18 um 14:33 schrieb Flavio Pompermaier:

Any idea about how to address this issue?
On Tue, Oct 16, 2018 at 11:32 AM Flavio Pompermaier 
mailto:pomperma...@okkam.it>> wrote:


Hi to all,
which is the correct wat to report back to the user a failure from
a job submission in FLink?
If everything is OK the job run API returns the job id but what if
there are error in parameter validation or some other problem?
Which is the right way to report back to the user the job error
detail (apart from throwing an Exception)?

Best,
Flavio







Re: How to run Flink 1.6 job cluster in "standalone" mode?

2018-11-12 Thread Timo Walther

Hi,

a session cluster does not imply that JM + TM are always executed in the 
same JVM. Debugging a job running on different JVMs might be a bit more 
difficult to debug but it should still be straightforward.


Maybe you can tell us what wrong behavior you observe?

Btw. Flink's metrics can also already be quite helpful.

Regards,
Timo

Am 07.11.18 um 14:15 schrieb Hao Sun:
"Standalone" here I mean job-mananger + taskmanager on the same JVM. I 
have an issue to debug on our K8S environment, I can not reproduce it 
in local docker env or Intellij. If JM and TM are running in different 
VMs, it makes things harder to debug.


Or is there a way to debug a job running on JM + TM on different VMs?
Is reverting to session cluster the only way to get JM + TM on the 
same VM?





Re: Report failed job submission

2018-11-12 Thread Timo Walther
I assume you are using the REST API? Flink's RestClusterClient is able 
to deserialize the exception including its cause that might be more 
helpful in your case as well.


The entire exception should be queryable via execution result [1]. At 
least we get a better error using the SQL Client [2] as an example.


Regards,
Timo

[1] 
https://ci.apache.org/projects/flink/flink-docs-master/monitoring/rest_api.html#jobs-jobid-execution-result
[2] 
https://github.com/apache/flink/blob/master/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ProgramDeployer.java 




Am 12.11.18 um 16:00 schrieb Flavio Pompermaier:
Let's say that my job needs to do some check before running (like 
existence of a file or some other condition): at the moment I can only 
throw an Exception but on the client side you get only something 
like: {"errors":["org.apache.flink.client.program.ProgramInvocationException: 
The main method caused an error."]}


I was wondering if there is any better way to handle this kind of 
problems..


On Mon, Nov 12, 2018 at 3:53 PM Timo Walther <mailto:twal...@apache.org>> wrote:


Hi Flavio,

I'm not entirely sure if I get your question correct but what you
are looking for is more information (like categorization) why the
submission failed right?

Regards,
Timo


Am 06.11.18 um 14:33 schrieb Flavio Pompermaier:

Any idea about how to address this issue?
On Tue, Oct 16, 2018 at 11:32 AM Flavio Pompermaier
mailto:pomperma...@okkam.it>> wrote:

Hi to all,
which is the correct wat to report back to the user a failure
from a job submission in FLink?
If everything is OK the job run API returns the job id but
what if there are error in parameter validation or some other
problem?
Which is the right way to report back to the user the job
error detail (apart from throwing an Exception)?

Best,
Flavio










Re: Run a Flink job: REST/ binary client

2018-11-12 Thread Timo Walther

I will loop in Chesnay. He might know more about the REST service internals.

Timo

Am 07.11.18 um 16:15 schrieb Flavio Pompermaier:
After a painful migration to Flink 1.6.2 we were able to run one of 
the jobs.
Unfortunately we faced the same behaviour: all the code after the 
first env.execute() is not execute if the job is called from the REST 
services or from the web UI, while everything works fine if running 
the job using 'bin/flink run' from a shell.


Any solution to this?

On Tue, Nov 6, 2018 at 4:55 PM Flavio Pompermaier 
mailto:pomperma...@okkam.it>> wrote:


Hi to all,
I'm using Flink 1.3.2. If executing a job using bin/flink run
everything goes well.
If executing using REST service of job manager (/jars:jarid/run)
the job writes to the sink but fails to return on env.execute()
and all the code after it is not executed.

Is this a known issue? Was it resolved in Flink 1.6.2?

Best,
Flavio






Re: flink run from savepoint

2018-11-12 Thread Timo Walther

Hi Franck,

as a first hint: paths are hard-coded in the savepoint's metadata so you 
should make sure that the path is still the same and accessible by all 
JobManagers and TaskManagers.


Can you share logs with us to figure out what caused the internal server 
error?


Thanks,
Timo


Am 07.11.18 um 17:34 schrieb Cussac, Franck:


Hi,

I’m working with Flink 1.5.0 and I try to run a job from a savepoint. 
My jobmanager is dockerized and I try to run my flink job in another 
container.


The command :

flink run -m jobmanager:8081 myJar.jar

works fine, but when I try to run a job from a savepoint, I got  an 
Internal server error.


Here my command to run flink job and the stacktrace :

flink run -m jobmanager:8081 -s file:/tmp/test/savepoint/ myJar.jar

Starting execution of program



The program finished with the following exception:

org.apache.flink.client.program.ProgramInvocationException: Could not 
retrieve the execution result.


at 
org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:258)


at 
org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:464)


at 
org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:66)


at 
org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:654)


at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)


at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)


at java.lang.reflect.Method.invoke(Method.java:498)

at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:528)


at 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:420)


at 
org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:404)


at 
org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:781)


at 
org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:275)


at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:210)

at 
org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1020)


at 
org.apache.flink.client.cli.CliFrontend.lambda$main$9(CliFrontend.java:1096)


at 
org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)


at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1096)

Caused by: org.apache.flink.runtime.client.JobSubmissionException: 
Failed to submit JobGraph.


at 
org.apache.flink.client.program.rest.RestClusterClient.lambda$submitJob$5(RestClusterClient.java:357)


at 
java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870)


at 
java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:852)


at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)


at 
java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)


at 
org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$5(FutureUtils.java:214)


at 
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)


at 
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)


at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)


at 
java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:561)


at 
java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:929)


at 
java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)


at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)


at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)


at java.lang.Thread.run(Thread.java:748)

Caused by: java.util.concurrent.CompletionException: 
org.apache.flink.runtime.concurrent.FutureUtils$RetryException: Could 
not complete the operation. Exception is not retryable.


at 
java.util.concurrent.CompletableFuture.encodeRelay(CompletableFuture.java:326)


at 
java.util.concurrent.CompletableFuture.completeRelay(CompletableFuture.java:338)


at 
java.util.concurrent.CompletableFuture.uniRelay(CompletableFuture.java:911)


at 
java.util.concurrent.CompletableFuture$UniRelay.tryFire(CompletableFuture.java:899)


... 12 more

Caused by: 
org.apache.flink.runtime.concurrent.FutureUtils$RetryException: Could 
not complete the operation. Exception is not retryable.


... 10 more

Caused by: java.util.concurrent.CompletionException: 
org.apache.flink.runtime.rest.util.RestClientException: [Internal 
server error.]


at 
java.util.concurrent.CompletableFuture.encodeRelay(CompletableFuture.java:326)


at 
java.util.concurrent.CompletableFuture.completeRelay(Completable

Re: InterruptedException when async function is cancelled

2018-11-12 Thread Timo Walther

Hi Anil,

if I researched correctly we are talking about these changes [1]. I 
don't know if you can back port it, but I hope this helps.


Regards,
Timo

[1] https://issues.apache.org/jira/browse/FLINK-9304


Am 07.11.18 um 17:41 schrieb Anil:

Hi Till,
 Thanks for the reply. Is there any particular patch I can use as
upgrading to Flink 1.6 is not an option for me at the moment.
Regards,
Anil.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/





Re: Implementation error: Unhandled exception - "Implementation error: Unhandled exception."

2018-11-12 Thread Timo Walther

Hi Richard,

this sounds like a bug to me. I will loop in Till (in CC) who might know 
more about this.


Regards,
Timo


Am 07.11.18 um 20:35 schrieb Richard Deurwaarder:

Hello,

We have a flink job / cluster running in kubernetes. Flink 1.6.2 (but 
the same happens in 1.6.0 and 1.6.1) To upgrade our job we use the 
REST API.


Every so often the jobmanager seems to be stuck in a crashing state 
and the logs show me this stack trace:


2018-11-07 18:43:05,815 [flink-scheduler-1] ERROR 
org.apache.flink.runtime.rest.handler.cluster.ClusterOverviewHandler - 
Implementation error: Unhandled exception.
akka.pattern.AskTimeoutException: Ask timed out on 
[Actor[akka://flink/user/dispatcher#1016927511]] after [1 ms]. 
Sender[null] sent message of type 
"org.apache.flink.runtime.rpc.messages.Implementation error: Unhandled 
exception.".
at 
akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:604)

at akka.actor.Scheduler$$anon$4.run(Scheduler.scala:126)
at 
scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:601)
at 
scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109)
at 
scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:599)
at 
akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:329)
at 
akka.actor.LightArrayRevolverScheduler$$anon$4.executeBucket$1(LightArrayRevolverScheduler.scala:280)
at 
akka.actor.LightArrayRevolverScheduler$$anon$4.nextTick(LightArrayRevolverScheduler.scala:284)
at 
akka.actor.LightArrayRevolverScheduler$$anon$4.run(LightArrayRevolverScheduler.scala:236)

at java.lang.Thread.run(Thread.java:748)

If I restart the jobmanager everything is fine afterwards, but the 
jobmanager will not restart by itself.


What might've caused this and is this something we can prevent?

Richard





Re: Multiple operators to the same sink

2018-11-12 Thread Timo Walther

Hi,

I'm not quite sure if I understand your problem correctly. But your use 
case sounds like a typical application of a union operation.


What do you mean with "knowledge of their destination sink"? The 
operators don't need to be aware of the destination sink. The only thing 
that needs to be coordinated is the result data type of each operation. 
So either you force each operation to have a unified type or you create 
a unified type before declaring the sink.


Or is every operator an independent Flink job? Maybe you can show us a 
skeleton of your pipeline?


Regards,
Timo


Am 08.11.18 um 01:20 schrieb burgesschen:

Hi Guys! I'm designing a topology where multiple operators should forward the
messages to the same sink.


For example I have Operator A,B,C,D,E. I want A,B,C to forward to Sink1 and
D, E to forward to Sink2.

My options are

1. Union A, B and C. then add Sink1 to them. Similarly for D and E. However,
the current framework out team has builds each operator individually. There
is nothing outside of the operators
that has the knowledge of their destination sink. It means we need to build
something on the job level to union the operators.

2. have each operator output to a side output tag. A,B, and C will output to
tag "sink1", And have a singleton sink1 to consume from tag "sink1".
Similarly for sink2. My concern here is that 'it feels hacky', since those
messages are not really side outputs.

is this a legitimate use case for output tag or not? Is there a better way
to achieve this? Thank you!






--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/





Re: flink job restarts when flink cluster restarts?

2018-11-12 Thread Timo Walther

Hi,

by default all the metadata is lost when shutting down the JobManager in 
a non high available setup. Flink uses Zookeeper together with a 
distributed filesystem to store the required metadata [1] in a 
persistent and distributed manner.


A single node setup is rather uncommon, but you can also start Zookeeper 
locally as it is done in our end-to-end tests [2].


I hope this helps.

Regards,
Timo

[1] 
https://ci.apache.org/projects/flink/flink-docs-master/ops/jobmanager_high_availability.html
[2] 
https://github.com/apache/flink/blob/master/flink-end-to-end-tests/test-scripts/test_ha_datastream.sh



Am 08.11.18 um 14:15 schrieb Chang Liu:
Or to say, how can I keep the jobs for system patching, server 
restart, etc. Is it related to Standalone vs YARN? Or is it related to 
whether to use Zookeeper?


Many thanks!

Best regards/祝好,

Chang Liu 刘畅


On 8 Nov 2018, at 13:38, Chang Liu > wrote:


Thanks!

If I have a cluster more than one node (standalone or YRAN), can I 
stop and start any single node among them and keep the job running?


Best regards/祝好,

Chang Liu 刘畅


On 7 Nov 2018, at 16:17, 秦超峰 <18637156...@163.com 
> wrote:


the second



秦超峰
邮箱:windyqinchaof...@163.com

 



签名由 网易邮箱大师  
定制


On 11/07/2018 17:14, Chang Liu  wrote:

Hi,

I have a question regarding whether the current running job will
restart if I stop and start the flink cluster?

1. Let’s say I am just having a Standalone one node cluster.
2. I have several Flink jobs already running on the cluster.
3. If I do a bin/cluster-stop.sh and then do a
bin/cluster-start.sh, will be previously running job restart again?

OR

Before I do bin/cluster-stop.sh, I have to do Savepoints for
each of the job.
After bin/cluster-start.sh is finished, I have to do Start Job
based on Savepoints triggered before for each of the job I want
to restart.

Many thanks in advance :)

Best regards/祝好,

Chang Liu 刘畅










Re: Rich variant for Async IO in Scala

2018-11-12 Thread Timo Walther

Hi Bruno,

`org.apache.flink.streaming.api.functions.async.RichAsyncFunction` 
should also work for the Scala API. `RichMapFunction` or 
`RichFilterFunction` are also shared between both APIs.


Is there anything that blocks you from using it?

Regards,
Timo

Am 09.11.18 um 01:38 schrieb Bruno Aranda:

Hi,

I see that the AsyncFunction for Scala does not seem to have a rich 
variant like the Java one. Is there a particular reason for this? Is 
there any workaround?


Thanks!

Bruno





Re: ***UNCHECKED*** Table To String

2018-11-13 Thread Timo Walther

Hi Steve,

if you are ok with using the DataStream API you can simply use a map() 
function [1] and call row.toString(). However, usually people want 
custom logic to construct a string. This logic could either be in SQL 
using the concat operator `||` or in the DataStream API.


Regards,
Timo

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/stream/operators/#datastream-transformations



Am 13.11.18 um 07:46 schrieb Steve Beischlien:
I have created a project to use SQL but instead of printing the output 
as below I need the output in a STRING so I can write it to a dynamoDB 
table.


How do I convert this "result" to a STRING or is there a suggestion of 
some other way I should sink to dynamoDB?  Any example code would 
REALLY help. THANKS!!


Table result = tableEnv.sql("SELECT 'Alert ',t_sKey, t_deviceID, t_sValue FROM SENSORS WHERE 
t_sKey='TEMPERATURE' AND t_sValue > " + TEMPERATURE_THRESHOLD); tableEnv.toAppendStream(result, Row.class).print();

Any assistance would be very appreciated.

Thanks





Re: Rich variant for Async IO in Scala

2018-11-13 Thread Timo Walther
It should compile against a RichAsyncFunction as well. Can you open an 
issue on JIRA for this? Including the compiler issues that you observe.


Thank you.

Am 13.11.18 um 15:40 schrieb Bruno Aranda:

Hi,

Tried again last night. The problem is that I was trying to 
use org.apache.flink.streaming.api.*scala*.AsyncDataStream, and that 
won't compile against the RichAsyncFunction. I could change it to use  
org.apache.flink.streaming.api.*datastream*.AsyncDataStream instead, 
but it is not as elegant as it requires the result to be a Java 
collection. But it works.


Thanks!

Bruno

On Mon, 12 Nov 2018 at 16:43 Timo Walther <mailto:twal...@apache.org>> wrote:


Hi Bruno,

`org.apache.flink.streaming.api.functions.async.RichAsyncFunction`
should also work for the Scala API. `RichMapFunction` or
`RichFilterFunction` are also shared between both APIs.

Is there anything that blocks you from using it?

Regards,
Timo

Am 09.11.18 um 01:38 schrieb Bruno Aranda:
> Hi,
>
> I see that the AsyncFunction for Scala does not seem to have a rich
> variant like the Java one. Is there a particular reason for
this? Is
> there any workaround?
>
> Thanks!
>
> Bruno






Re: ***UNCHECKED*** Table To String

2018-11-13 Thread Timo Walther
I would recommend the training exercises by data Artisans [1]. They have 
challenging exercises and also nice solutions in Java and Scala.


Flink's end-to-end test also contain a lot of good example code [2].

I hope this helps.

Regards,
Timo

[1] http://training.data-artisans.com/
[2] https://github.com/apache/flink/tree/master/flink-end-to-end-tests


Am 13.11.18 um 16:28 schrieb Steve Bistline:

Hi Timo,

Thank you... I am not very good with the MapFunction and trying to 
find an example. Will hack at it a bit.


Appreciate your help.

Steve

On Tue, Nov 13, 2018 at 12:42 AM Timo Walther <mailto:twal...@apache.org>> wrote:


Hi Steve,

if you are ok with using the DataStream API you can simply use a
map() function [1] and call row.toString(). However, usually
people want custom logic to construct a string. This logic could
either be in SQL using the concat operator `||` or in the
DataStream API.

Regards,
Timo

[1]

https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/stream/operators/#datastream-transformations


Am 13.11.18 um 07:46 schrieb Steve Beischlien:

I have created a project to use SQL but instead of printing the
output as below I need the output in a STRING so I can write it
to a dynamoDB table.

How do I convert this "result" to a STRING or is there a
suggestion of some other way I should sink to dynamoDB?  Any
example code would REALLY help. THANKS!!

Table result = tableEnv.sql("SELECT 'Alert ',t_sKey, t_deviceID, t_sValue 
FROM SENSORS WHERE
t_sKey='TEMPERATURE' AND t_sValue > " + TEMPERATURE_THRESHOLD); 
tableEnv.toAppendStream(result, Row.class).print();
Any assistance would be very appreciated.

Thanks







Re: Group by with null keys

2018-11-20 Thread Timo Walther
I assigned the issue to me. Because I wanted to that for a very long 
time. I already did some prerequisite work for the documentation in 
`org.apache.flink.api.common.typeinfo.Types`.


Thanks,
Timo

Am 20.11.18 um 11:44 schrieb Flavio Pompermaier:
Sure! The problem is that Dataset API does an implicit conversion to 
Tuples during chaining and I didn't found any documentation about this 
(actually I was  pleasantly surprised by the fact that the Table API 
were supporting aggregates on null values..).


Here it is: https://issues.apache.org/jira/browse/FLINK-10947

Thanks for the reply,
Flavio

On Tue, Nov 20, 2018 at 11:33 AM Fabian Hueske > wrote:


Hi Flavio,

Whether groupBy with null values works or not depends on the type
of the key, or more specifically on the TypeComparator and
TypeSerializer that are used to serialize, compare, and hash the
key type.
The processing engine supports null values If the comparator and
serializer can handle null input values.

Flink SQL wraps keys in the Row type and the corresponding
serializer / comparator can handle null fields.
If you use Row in DataSet / DataStream programs, null values are
supported as well.

I think it would be good to discuss the handling of null keys on
the documentation about data types [1] and link to that from
operators that require keys.
Would you mind creating a Jira issue for that?

Thank you,
Fabian

[1]

https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/types_serialization.html

Am Mo., 19. Nov. 2018 um 12:31 Uhr schrieb Flavio Pompermaier
mailto:pomperma...@okkam.it>>:

Hi to all,
we wanted to do a group by on elements that can contains null
values and we discovered that Table API support this while
Dataset API does not.
Is this documented somehwere on the Flink site?

Best,
Flavio

---

PS: you can test this with the following main:

public static void main(String[] args) throws Exception {
    final ExecutionEnvironment env =
ExecutionEnvironment.getExecutionEnvironment();
    final BatchTableEnvironment btEnv =
TableEnvironment.getTableEnvironment(env);
    final DataSet testDs = env
        .fromElements("test", "test", "test2", "null", "null",
"test3")
        .map(x -> "null".equals(x) ? null : x);

    boolean testDatasetApi = true;
    if (testDatasetApi) {
      testDs.groupBy(x -> x).reduceGroup(new
GroupReduceFunction() {

        @Override
        public void reduce(Iterable values,
Collector out) throws Exception {
          int cnt = 0;
          for (String value : values) {
            cnt++;
          }
          out.collect(cnt);
        }
      }).print();
    }

    btEnv.registerDataSet("TEST", testDs, "field1");
    Table res = btEnv.sqlQuery("SELECT field1, count(*) as cnt
FROM TEST GROUP BY field1");
    DataSet result = btEnv.toDataSet(res,
        new RowTypeInfo(BasicTypeInfo.STRING_TYPE_INFO,
BasicTypeInfo.LONG_TYPE_INFO));
    result.print();
  }







Re:

2018-11-27 Thread Timo Walther

Hi Hengyu,

currently, metadata between Flink programs can only be shared via code. 
For this, we recently introduced a programmatic descriptor-based way of 
specifying sources and sinks [1].


However, a catalog such as Hive metastore would be much easier and the 
community is currently working on making this possible [2].


Regards,
Timo

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/table/connect.html
[2] 
https://docs.google.com/document/d/1SkppRD_rE3uOKSN-LuZCqn4f7dz0zW5aa6T_hBZq5_o/edit?usp=sharing



Am 27.11.18 um 09:44 schrieb Henry Dai:

Hi,
Is there a way to get table's metadata in flink?
if I emit a table to kafka, then how can I know the table columns when 
I subscribe the kafka topic and restore the table using 
/tableEnv.registerDataStream("t1", source, "field1, field2 ...") /in 
another flink program?


Flink should provide something like Hive's metastore to keep metadata 
of tables.


--
best wishes
hengyu





Re: SQL Query named operator exceeds 80 characters

2018-11-29 Thread Timo Walther

Unfortunetely, renaming of operators is not supported so far.

We are currently thinking about a way of having fine-grained control 
about properties of SQL operators but this is in an early design phase 
and might take a while.


Regards,
Timo

Am 29.11.18 um 10:32 schrieb Kostas Kloudas:

Hi,

I think that you cannot set it explicitly.

The reason that I would say that is because SQL query gets parsed 
through Calcite
and then get translated to a DataStream program through a process that 
is rather

opaque to the user.

That said, I also cc'ed Fabian and Timo who know more on the topic.

Cheers,
Kostas





Re: Questions about UDTF in flink SQL

2018-11-29 Thread Timo Walther

Hi Wangsan,

currently, UDFs have very strict result type assumptions. This is 
necessary to determine the serializers for the cluster. There were 
multiple requests for more flexible handling of types in UDFs.


Please have a look at:
- [FLINK-7358] Add implicitly converts support for User-defined function
- [FLINK-9294] [table] Improve type inference for UDFs with composite 
parameter and/or result type

- [FLINK-10958] [table] Add overload support for user defined function

I you think those issues do not represent what you need. You can open a 
new issue with a little example of what feature you think is missing.


Regards,
Timo


Am 28.11.18 um 09:59 schrieb wangsan:

Hi all,

When using user-defined table function in Flink SQL, it seems that the result 
type of a table function must be determinstic.

If I want a UDTF whose result type is determined by its input parameters, what 
should I do?

What I want to do is like this:

```
SELECT input, f1, f2 length FROM MyTable, LATERAL TABLE(unnest_udtf(input, v1, 
v2)) as T(f1, f2), LATERAL TABLE(unnest_udtf(input, v3, v4, v5)) as T(f3, f4, 
f5)
```

I can surely register the same UDTF with different name and configuration, but 
I guess that’s not a good idea :(.

If we can not make this in Flink SQL for now , may be we should consider this 
feature in future?

Best,
wangsan





Re: Table exception

2018-11-29 Thread Timo Walther

Hi Michael,

this dependency issue should have been fixed recently. Which Flink 
version are you using?


Regards,
Timo


Am 29.11.18 um 16:01 schrieb TechnoMage:
I have a simple test for looking at Flink SQL and hit an exception 
reported as a bug.  I wonder though if it is a missing dependency.


Michael

 Error in test
org.apache.flink.runtime.client.JobExecutionException: Job execution 
failed.
at 
org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
at 
org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:630)
at 
org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:123)
at 
com.cogility.hcep.tests.experimental.FlinkSQLTest.lambda$1(FlinkSQLTest.java:171)

at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.flink.api.common.InvalidProgramException: Table 
program cannot be compiled. This is a bug. Please file an issue.
at 
org.apache.flink.table.codegen.Compiler$class.compile(Compiler.scala:36)
at 
org.apache.flink.table.runtime.aggregate.GroupAggProcessFunction.compile(GroupAggProcessFunction.scala:39)
at 
org.apache.flink.table.runtime.aggregate.GroupAggProcessFunction.open(GroupAggProcessFunction.scala:61)
at 
org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
at 
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
at 
org.apache.flink.streaming.api.operators.LegacyKeyedProcessOperator.open(LegacyKeyedProcessOperator.java:60)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:424)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:290)

at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
... 1 more
Caused by: org.codehaus.commons.compiler.CompileException: Line 43, 
Column 10: Unknown variable or type 
"org.apache.commons.codec.binary.Base64"

at org.codehaus.janino.UnitCompiler.compileError(UnitCompiler.java:11672)
at org.codehaus.janino.UnitCompiler.getType2(UnitCompiler.java:6443)
at org.codehaus.janino.UnitCompiler.access$13000(UnitCompiler.java:212)
at 
org.codehaus.janino.UnitCompiler$18.visitPackage(UnitCompiler.java:6055)
at 
org.codehaus.janino.UnitCompiler$18.visitPackage(UnitCompiler.java:6052)

at org.codehaus.janino.Java$Package.accept(Java.java:4074)
at org.codehaus.janino.UnitCompiler.getType(UnitCompiler.java:6052)
at org.codehaus.janino.UnitCompiler.getType2(UnitCompiler.java:6438)
at org.codehaus.janino.UnitCompiler.access$13600(UnitCompiler.java:212)
at 
org.codehaus.janino.UnitCompiler$18$2$1.visitAmbiguousName(UnitCompiler.java:6080)
at 
org.codehaus.janino.UnitCompiler$18$2$1.visitAmbiguousName(UnitCompiler.java:6077)

at org.codehaus.janino.Java$AmbiguousName.accept(Java.java:4050)
at 
org.codehaus.janino.UnitCompiler$18$2.visitLvalue(UnitCompiler.java:6077)
at 
org.codehaus.janino.UnitCompiler$18$2.visitLvalue(UnitCompiler.java:6073)

at org.codehaus.janino.Java$Lvalue.accept(Java.java:3974)
at org.codehaus.janino.UnitCompiler$18.visitRvalue(UnitCompiler.java:6073)
at org.codehaus.janino.UnitCompiler$18.visitRvalue(UnitCompiler.java:6052)
at org.codehaus.janino.Java$Rvalue.accept(Java.java:3942)
at org.codehaus.janino.UnitCompiler.getType(UnitCompiler.java:6052)
at org.codehaus.janino.UnitCompiler.findIMethod(UnitCompiler.java:8591)
at org.codehaus.janino.UnitCompiler.getType2(UnitCompiler.java:6689)
at org.codehaus.janino.UnitCompiler.access$15200(UnitCompiler.java:212)
at 
org.codehaus.janino.UnitCompiler$18$2.visitMethodInvocation(UnitCompiler.java:6100)
at 
org.codehaus.janino.UnitCompiler$18$2.visitMethodInvocation(UnitCompiler.java:6073)

at org.codehaus.janino.Java$MethodInvocation.accept(Java.java:4874)
at org.codehaus.janino.UnitCompiler$18.visitRvalue(UnitCompiler.java:6073)
at org.codehaus.janino.UnitCompiler$18.visitRvalue(UnitCompiler.java:6052)
at org.codehaus.janino.Java$Rvalue.accept(Java.java:3942)
at org.codehaus.janino.UnitCompiler.getType(UnitCompiler.java:6052)
at 
org.codehaus.janino.UnitCompiler.findMostSpecificIInvocable(UnitCompiler.java:8802)

at org.codehaus.janino.UnitCompiler.findIMethod(UnitCompiler.java:8688)
at org.codehaus.janino.UnitCompiler.findIMethod(UnitCompiler.java:8590)
at org.codehaus.janino.UnitCompiler.compileGet2(UnitCompiler.java:4708)
at org.codehaus.janino.UnitCompiler.access$8200(UnitCompiler.java:212)
at 
org.codehaus.janino.UnitCompiler$12.visitMethodInvocation(UnitCompiler.java:4071)
at 
org.codehaus.janino.UnitCompiler$12.visitMethodInvocation(UnitCompiler.java:4044)

at org.codehaus.janino.Java$MethodInvocation.accept(Java.java:4874)
at org.codehaus.janino.UnitCompiler.compileGet(UnitCompiler.java:4044)
at 
org.codehaus.janino.UnitCompiler.compileGetValue(UnitCompiler.java:5224)

at org.codehaus.janino.UnitCompiler.compileGet2(UnitCompiler.java:4667)
at org.codehaus.janino.UnitCompiler.access$7700(UnitCompiler.java:212)
a

Re: If you are an expert in flink sql, then I really need your help...

2018-12-03 Thread Timo Walther

Hi,

it is very difficult to spot the problem with the little information you 
gave us.


Maybe you can show us a simplified SQL query and the implementation of 
the `LAST_VALUE` function?


An initial guess would be that you are running out of memory such that 
YARN kills your task manager. If you are sure that you state size 
remains constant, you could also try to use a different state backend 
that spills to disk. Have you tried out the RocksDB state backend?


Did you configure a state rentention time?

Regards,
Timo


Am 03.12.18 um 16:15 schrieb clay:

 I am using flink sql to do some complicated calculations. I have
encountered some very difficult problems in this process, so I would like to
ask everyone for your help. My goal is to build a data stream with a very
accurate result, which is also in line with the Streaming System. The core
idea of ​​this book is also what I have to do. I use kafka to receive the
mysql binlog as the data source, then join into multiple tables, and then
perform complex sql calculations on these multiple tables. I found that
flink does not provide upsert. Implementation, so I added a last_value(xxx),
last_value(xxx)..group by(id) operation for each kafka data source to ensure
consistency of the final result, which works, I understand this Will cache a
dynamic table, resulting in a large state (about 3 G), but seems to
introduce some other very strange problems, summarized as follows:

1. In the case of sql is very complicated, it is clear that checkpoint is
turned on, but the web interface finds that there is no checkpoint at all,
and none of them
2. During the running of the program, it frequently hangs. The error has
always been the following error:

(1) the assigned slot id_x was removed
(2) the heartbeat with taskmanager was timeout

I have used slotSharingGroup to split tasks into different slots whenever
possible, but I still often report these two errors, causing the program to
hang.

I have no clue about these mistakes. If anyone can help, I really appreciate
it.

Added: I receive data from 4 kafka topics, the maximum amount of data is
more than 20 million.
My startup command is as follows
  
Flink1.6/bin/flink run -m yarn-cluster -ytm 23240 -yn 3 -ys 2 -ynm  -yqu

 -c xxx xxx.jar ./test.conf



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/





Re: If you are an expert in flink sql, then I really need your help...

2018-12-04 Thread Timo Walther

Hi,

yes this was a unintended behavior that got fixed in Flink 1.7.

See https://issues.apache.org/jira/browse/FLINK-10474

Regards,
Timo


Am 04.12.18 um 05:21 schrieb clay:

I have found out that checkpoint is not triggered. Regarding the in
operation in flink sql, this sql will trigger checkpoint normally.

select name,age from user where id in
(5102,597816,597830,597817,597818,597819,597805,27,597820,597821,597822,597823,597825,597826,597827,597828,597839,597831,597840)

This sql will not trigger

(5102,597816,597830,597817,597818,597819,597805,27,597820,597821,597822,597823,597825,597826,597827,597828,597839,597831,597840,123456)

is this a bug?



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/





Re: If you are an expert in flink sql, then I really need your help...

2018-12-04 Thread Timo Walther
Unfortunately, setting the parallelism per SQL operator is not supported 
right now.



We are currently thinking about a way of having fine-grained control 
about properties of SQL operators but this is in an early design phase 
and might take a while




Am 04.12.18 um 13:05 schrieb clay:

hi Timo:

first very thank u, I have solve the ploblems,

Regarding the problem of too large state, I set the global parallelism to 7
for the program, which solved my problem very well, checkpoint is very fast,
but I would like to ask if there is a way to set parallelism for each
operator(translated from sql statement) instead of global settings?




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/





Re: Discuss [FLINK-9740] Support group windows over intervals of months

2018-12-06 Thread Timo Walther

Hi,

thanks for working on this. The user mailing list is not the right place 
to start development discussions. Please use the dev@ mailing list. Can 
you attach you design to the Jira issue? We can then further discuss there.


Thanks,
Timo


Am 06.12.18 um 09:45 schrieb x1q1j1:

Hi! Timo Walther??Jark

?0?2 Thank you very much. I have some thoughts and ideas about this 
isuse, which are attached in the email.

https://issues.apache.org/jira/browse/FLINK-9740

Best wishes.
Yours truly,
ForwardXu





Re: Run simple flink application via "java -jar"

2018-12-06 Thread Timo Walther

Hi Krishna,

yes this should work given that you included all dependencies that are 
marked as "provided" in a Flink example project. In general, when you 
develop a Flink application, you can can simply press the run button in 
your IDE. This will start a mini cluster locally for debugging purposes.


However, if your goal is just to run Flink as a standalone application. 
You can stil use `./start-cluster.sh` and `/flink run`. By default the 
start cluster script starts a single task manager with one slot for a 
standalone mode.


Regards,
Timo

Am 06.12.18 um 14:36 schrieb Krishna Kalyan:

Hello,

This is a very n00b question. Can we run a flink job (for example 
wordcount) using "java -jar " in standalone mode. I 
usually see examples using  "$FLINK_HOME/bin/flink run  
". If yes can someone please point me to an example.


Regards,
Krishna

Standorte in Stuttgart und Berlin  ·Zoi 
TechCon GmbH · Quellenstr. 7 · 70376 Stuttgart · Geschäftsführer: 
Benjamin Hermann, Dr. Daniel Heubach. Amtsgericht Stuttgart HRB 
759619, Gerichtsstand Stuttgart. Die genannten Angaben werden 
automatisch hinzugefügt und lassen keine Rückschlüsse auf den 
Rechtscharakter der E-Mail zu. This message (including any 
attachments) contains confidential information intended for a specific 
individual and purpose, and is protected by law. If you are not the 
intended recipient, you should delete this message. Any disclosure, 
copying, or distribution of this message, or the taking of any action 
based on it, is strictly prohibited.






Re: sql program throw exception when new kafka with csv format

2018-12-11 Thread Timo Walther

Hi Marvin,

the CSV format is not supported for Kafka so far. Only formats that have 
the tag `DeserializationSchema` in the docs are supported.


Right now you have to implement you own DeserializationSchemaFactory or 
use JSON or Avro.


You can follow [1] to get informed once the CSV format is supported. I'm 
sure it will be merge for Flink 1.8.


Regards,
Timo

[1] https://issues.apache.org/jira/browse/FLINK-7050


Am 11.12.18 um 10:41 schrieb Marvin777:
Register kafka message source with csv format,  the error message is 
as follows:


Exception in thread "main"
org.apache.flink.table.api.NoMatchingTableFactoryException: Could
not find a suitable table factory for
'org.apache.flink.table.factories.DeserializationSchemaFactory' in
the classpath.


Reason: No context matches.


BTW, the flink version is 1.6.2 .

Thanks Marvin.

image.png





Re: SANSA 0.5 (Scalable Semantic Analytics Stack) Released

2018-12-14 Thread Timo Walther

Hi,

looks like a very useful extension to Flink. Thanks for letting us know!

You can also use the commun...@flink.apache.org mailing list to spread 
the news because the user@ list is more for user support questions and help.


Regards,
Timo

Am 14.12.18 um 09:23 schrieb GezimSejdiu:

Dear all,

The Smart Data Analytics group (http://sda.tech) is happy to announce SANSA
0.5 - the fifth release of the Scalable Semantic Analytics Stack. SANSA
employs distributed computing via Apache Spark and Apache Flink in order to
allow scalable machine learning, inference and querying capabilities for
large knowledge graphs.

Website: http://sansa-stack.net
GitHub: https://github.com/SANSA-Stack
Download: https://github.com/SANSA-Stack/SANSA-Stack/releases

We included support for heterogeneous data sources via a new data lake
concept, added new clustering algorithms, support for the Ontop virtual RDF
graph tool and many more features in this release. You can find the FAQ and
usage examples at http://sansa-stack.net/faq/.

View this announcement on Twitter and the Sansa blog:
   http://sansa-stack.net/sansa-0-5/
   https://twitter.com/SANSA_Stack/status/1072897902828285953

Kind regards,

The SANSA Development Team
(http://sansa-stack.net/community/#Contributors)



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/





Re: Is there a better way to debug ClassNotFoundException from FlinkUserCodeClassLoaders?

2019-01-03 Thread Timo Walther

Hi Hao,

which Flink version are you using? What do you mean with "suddenly", did 
it work before?


Regards,
Timo


Am 03.01.19 um 07:13 schrieb Hao Sun:
Yep, javap shows the class is there, but FlinkUserCodeClassLoaders 
somehow could not find it suddenly


javap -cp /opt/flink/lib/zendesk-fps-core-assembly-0.1.0.jar 
'com.zendesk.fraudprevention.examples.ConnectedStreams$$anon$90$$anon$45'

Compiled from "ConnectedStreams.scala"
public final class 
com.zendesk.fraudprevention.examples.ConnectedStreams$$anon$90$$anon$45 
extends 
org.apache.flink.api.scala.typeutils.CaseClassSerializer 
{
public com.zendesk.fraudprevention.datatypes.MaxwellEvent 
createInstance(java.lang.Object[]);
public 
org.apache.flink.api.scala.typeutils.CaseClassSerializer 
createSerializerInstance(java.lang.Class, 
org.apache.flink.api.common.typeutils.TypeSerializer[]);
public org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase 
createSerializerInstance(java.lang.Class, 
org.apache.flink.api.common.typeutils.TypeSerializer[]);

public java.lang.Object createInstance(java.lang.Object[]);
public 
com.zendesk.fraudprevention.examples.ConnectedStreams$$anon$90$$anon$45(com.zendesk.fraudprevention.examples.ConnectedStreams$$anon$90, 
org.apache.flink.api.common.typeutils.TypeSerializer[]);

}

Hao Sun
Team Lead
1019 Market St. 7F
San Francisco, CA 94103


On Wed, Jan 2, 2019 at 6:04 PM qi luo > wrote:


Hi Hao,

Since Flink is using Child-First class loader, you may try search
for the class
"*com.zendesk.fraudprevention.examples.ConnectedStreams$$anon$90$$anon$45”
in your fat JAR. Is that an inner class?*
*
*
*Best,*
*Qi*


On Jan 3, 2019, at 7:01 AM, Hao Sun mailto:ha...@zendesk.com>> wrote:

Hi,

I am wondering if there are any protips to figure out what class
is not found?

= Logs 
org.apache.flink.streaming.runtime.tasks.StreamTaskException:
Could not instantiate chained outputs.
at

org.apache.flink.streaming.api.graph.StreamConfig.getChainedOutputs(StreamConfig.java:324)
at

org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:292)
at

org.apache.flink.streaming.runtime.tasks.OperatorChain.(http://OperatorChain.java:133)
at

org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:267)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
at java.lang.Thread.run(Thread.java:748)
*Caused by: java.lang.ClassNotFoundException:
com.zendesk.fraudprevention.examples.ConnectedStreams$$anon$90$$anon$45*
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at

org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$ChildFirstClassLoader.loadClass(FlinkUserCodeClassLoaders.java:129)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:348)
at

org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:77)
at
java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1866)
at
java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1749)
at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2040)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1571)
at
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2285)
at
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2209)
at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2067)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1571)
at
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2285)
at
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2209)
at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2067)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1571)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431)
at java.util.ArrayList.readObject(ArrayList.java:797)
at sun.reflect.GeneratedMethodAccessor26.invoke(Unknown Source)
at

sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1158)
at
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2176)
at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2067)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1571)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431)
at

org.apache.flink.util.InstantiationUtil.deserializeObject(Instanti

Re: The way to write a UDF with generic type

2019-01-04 Thread Timo Walther

Hi Yinhua,

Flink needs to know how to serialize and deserialize a type `T`. If you 
are using a type variable here, Flink can not derive the type information.


You need to override 
org.apache.flink.table.functions.AggregateFunction#getResultType and 
return type information that matches.


Regards,
Timo



Am 04.01.19 um 10:28 schrieb yinhua.dai:

Hi Chesnay,

Maybe you misunderstand my question.
I have below code:
public class MyMaxAggregation extends AggregateFunction {
   @Override
   public MyAccumulator createAccumulator() {
 return new MyAccumulator();
   }

   @Override
   public T getValue(MyAccumulator accumulator) {
 return null;
   }

   static class MyAccumulator {
 double maxValue;
   }

}

But tableEnv.registerFunction("MYMAX", new MyMaxAggregation());
will throw exception as below:
Exception in thread "main"
org.apache.flink.api.common.functions.InvalidTypesException: Type of
TypeVariable 'T' in 'class com.tr.apt.test.MyMaxAggregation' could not be
determined. This is most likely a type erasure problem. The type extraction
currently supports types with generic variables only in cases where all
variables in the return type can be deduced from the input type(s).
Otherwise the type has to be specified explicitly using type information.
at
org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfoWithTypeHierarchy(TypeExtractor.java:882)
at
org.apache.flink.api.java.typeutils.TypeExtractor.privateCreateTypeInfo(TypeExtractor.java:803)
at
org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfo(TypeExtractor.java:769)
at
org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfo(TypeExtractor.java:762)
at
org.apache.flink.table.api.java.StreamTableEnvironment.registerFunction(StreamTableEnvironment.scala:482)
at com.tr.apt.test.StreamingJob.main(StreamingJob.java:52)




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/





Re: The way to write a UDF with generic type

2019-01-07 Thread Timo Walther
Currently, there is no more flexible approch for aggregate functions. 
Scalar functions can be overloaded but aggregate functions do not 
support this so far.


Regards,
Timo


Am 07.01.19 um 02:27 schrieb yinhua.dai:

Hi Timo,

But getResultType should only return a concrete type information, right?
How could I implement with a generic type?

I'd like to clarify my questions again.
Say I want to implement my own "MAX" function, but I want to apply it to
different types, e.g. integer, long, double etc, so I tried to write a class
which extends AggregateFunction *with generic type* to implement the max
function.

Then I want to register only one function name for all types.
E.g.
tableEnv.registerFunction("MYMAX", new MyMax());
instead of
tableEnv.registerFunction("MYINTEGERMAX", new MyIntegerMax());
tableEnv.registerFunction("MYLONGMAX", new MyLongMax());
tableEnv.registerFunction("MYDOULBEMAX", new MyDoubleMax());

Is there a way to implement that?
I know the build in function "MAX" can apply to all types, so I wonder if I
can also implement that.
Thanks.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/





Re: How to get the temp result of each dynamic table when executing Flink-SQL?

2019-01-07 Thread Timo Walther

Hi Henry,

such a feature is currently under discussion [1] feel free to 
participate here and give feedback. So far you need to have some 
intermediate store usually this could be Kafka or a filesystem.


I would recommend to write little unit tests that test each SQL step 
like it is done here [2].


I hope this helps.

Regards,
Timo

[1] 
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Support-Interactive-Programming-in-Flink-Table-API-tt25372.html#a25666
[2] 
https://github.com/apache/flink/blob/master/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/SqlITCase.scala



Am 07.01.19 um 16:19 schrieb 徐涛:

Hi Expert,
Usually when we write Flink-SQL program, usually we need to use 
multiple tables to get the final result, this is due to sometimes it is not 
possible to implement complicated logic in one SQL, sometimes due to the 
clarity of logic. For example:
create view A as
select * from source where xxx;

create view B as
select * from A where xxx;

create view C as
select * from B where xxx;

insert into sink
select * from C where xxx;

But when we write complicated logic, we may accomplish it step by step, 
make sure that the first step is correct, then go on with the next step. In 
batch program such as Hive or Spark, we usually write SQL like this, step by 
step.

For example:
create view A as
select * from source where xxx;
I want to check if the content in A is correct, if it is correct I go 
on to write another SQL. But I do not want to define a sink for each step, 
because it is not worthy just create a sink for such a “debug” step.
So is there a solution or best practice for such a scenario? How do we 
easily debug or verify the correctness  of a Flink SQL program?

Best
Henry





Re: Reducing runtime of Flink planner

2019-01-07 Thread Timo Walther

Hi Niklas,

it would be interesting to know which planner caused the long runtime. 
Could you use a debugger to figure out more details? Is it really the 
Flink Table API planner or the under DataSet planner one level deeper?


There was an issue that was recently closed [1] about the DataSet 
optimizer. Could this solve your problem?


I will also loop in Fabian who might knows more.

Regards,
Timo

[1] https://issues.apache.org/jira/browse/FLINK-10566

Am 07.01.19 um 14:05 schrieb Niklas Teichmann:

Hi everybody,

I have a question concerning the planner for the Flink Table / Batch API.
At the moment I try to use a library called Cypher for Apache Flink, a 
project that tries to implement
the graph database query language Cypher on Apache Flink (CAPF, 
https://github.com/soerenreichardt/cypher-for-apache-flink).


The problem is that the planner seemingly takes a very long time to 
plan and optimize the job created by CAPF. This example job in json 
format


https://pastebin.com/J84grsjc

takes on a 24 GB data set about 20 minutes to plan and about 5 minutes 
to run the job. That seems very long for a job of this size.


Do you have any idea why this is the case?
Is there a way to give the planner hints to reduce the planning time?

Thanks in advance!
Niklas





Re: Buffer stats when Back Pressure is high

2019-01-07 Thread Timo Walther

Hi Gagan,

a typical solution to such a problem is to introduce an artifical key 
(enrichment id + some additional suffix), you can then keyBy on this 
artificial key and thus spread the workload more evenly. Of course you 
need to make sure that records of the second stream are duplicated to 
all operators with the same artificial key.


Depending on the frequency of the second stream, it might also worth to 
use a broadcast join that distributes the second stream to all operators 
such that all operators can perform the enrichment step in a round robin 
fashion.


Regards,
Timo

Am 07.01.19 um 14:45 schrieb Gagan Agrawal:

Flink Version is 1.7.
Thanks Zhijiang for your pointer. Initially I was checking only for 
few. However I just checked for all and found couple of them having 
queue length of 40+ which seems to be due to skewness in data. Is 
there any general guide lines on how to handle skewed data? In my case 
I am taking union and then keyBy (with custom stateful Process 
function) on enrichment id of 2 streams (1 enrichment stream with low 
volume and another regular data stream with high volume). I see that 
30% of my data stream records have same enrichment Id and hence go to 
same tasks which results in skewness. Any pointers on how to handle 
skewness while doing keyBy would be of great help.


Gagan

On Mon, Jan 7, 2019 at 3:25 PM zhijiang > wrote:


Hi Gagan,

What flink version do you use? And have you checked the
buffers.inputQueueLength for all the related parallelism
(connected with A) of B?  It may exist the scenario that only one
parallelim B is full of inqueue buffers which back pressure A, and
the input queue for other parallelism B is empty.

Best,
Zhijiang

--
From:Gagan Agrawal mailto:agrawalga...@gmail.com>>
Send Time:2019年1月7日(星期一) 12:06
To:user mailto:user@flink.apache.org>>
Subject:Buffer stats when Back Pressure is high

Hi,
I want to understand does any of buffer stats help in
debugging / validating that downstream operator is performing
slow when Back Pressure is high? Say I have A -> B operators
and A shows High Back Pressure which indicates something wrong
or not performing well on B side which is slowing down
operator A. However when I look at buffers.inputQueueLength
for operator B, it's 0. My understanding is that when B is
processing slow, it's input buffer will be full of incoming
messages which ultimately blocks/slows down upstream operator
A. However it doesn't seem to be happening in my case. Can
someone throw some light on how should different stats around
buffers (e.g buffers.inPoolUsage, buffers.inputQueueLength,
numBuffersInLocalPerSecond, numBuffersInRemotePerSecond) look
like when downstream operator is performing slow?

Gagan






Re: onTimer function is not getting executed and job is marked as finished.

2019-01-07 Thread Timo Walther

Hi Puneet,

maybe you can show or explain us a bit more about your pipeline. From 
what I see your ProcessFunction looks correct. Are you sure the 
registering takes place?


Regards,
Timo

Am 07.01.19 um 14:15 schrieb Puneet Kinra:

Hi Hequn

Its a streaming job .

On Mon, Jan 7, 2019 at 5:51 PM Hequn Cheng > wrote:


Hi Puneet,

The value of the registered timer should within startTime and
endTime of your job. For example, job starts at processing time t1
and stops at processing time t2. You have to make sure t1<
`parseLong + 5000` < t2.

Best, Hequn

On Mon, Jan 7, 2019 at 5:50 PM Puneet Kinra
mailto:puneet.ki...@customercentria.com>> wrote:

Hi All

Facing some issue with context to onTimer method in
processfunction

class TimerTest extends
ProcessFunction,String>{

/**
*
*/
private static final long serialVersionUID = 1L;

@Override
public void processElement(Tuple2 arg0,
ProcessFunction, String>.Context ctx,
Collector arg2) throws Exception {
// TODO Auto-generated method stub
long parseLong = Long.parseLong(arg0.f1);
TimerService timerService = ctx.timerService();
ctx.timerService().registerProcessingTimeTimer(parseLong + 5000);
}

@Override
public void onTimer(long timestamp,
ProcessFunction, String>.OnTimerContext
ctx,
Collector out) throws Exception {
// TODO Auto-generated method stub
super.onTimer(timestamp, ctx, out);
System.out.println("Executing timmer"+timestamp);
out.collect("Timer Testing..");
}
}

-- 
*Cheers *

*
*
*Puneet Kinra*
*
*

*Mobile:+918800167808 | Skype :
puneet.ki...@customercentria.com
*

*e-mail :puneet.ki...@customercentria.com
*




--
*Cheers *
*
*
*Puneet Kinra*
*
*

*Mobile:+918800167808 | Skype : puneet.ki...@customercentria.com 
*


*e-mail :puneet.ki...@customercentria.com 
*







Re: Building Flink from source according to vendor-specific version but causes protobuf conflict

2019-01-07 Thread Timo Walther

Hi Wei,

did you play around with classloading options mentioned here [1]. The -d 
option might impact how classes are loaded when the job is deployed on 
the cluster.


I will loop in Gary that might now more about the YARN behavior.

Regards,
Timo

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.7/ops/deployment/yarn_setup.html#user-jars--classpath



Am 07.01.19 um 10:33 schrieb Wei Sun:

Hi guys,

Good day.

I rebuilt flink from the source and specified the vendor specific 
Hadoop version. It works well when i just submit a streaming 
application  without '-d'(--detached) option as follows:
bin/flink run -m yarn-cluster -yqu root.streaming -yn 5 -yjm 2048 -ytm 
3096 -ynm CJVFormatter -ys 2 -c yidian.data.cjv.Formatter 
./cjv-formatter-1.0-SNAPSHOT.jar --conf ./formatter.conf


But if i add the '-d'(--detached) option,  a 
'*org.apache.flink.client.deployment.ClusterDeploymentException*' will 
be thrown out to the CLI. Just as:
bin/flink run */-d/* -m yarn-cluster -yqu root.streaming -yn 5 -yjm 
2048 -ytm 3096 -ynm CJVFormatter -ys 2 -c yidian.data.cjv.Formatter 
./cjv-formatter-1.0-SNAPSHOT.jar --conf ./formatter.conf


*Exception 
start*

 The program finished with the following exception:
org.apache.flink.client.deployment.ClusterDeploymentException: Could 
not deploy Yarn job cluster.
at 
org.apache.flink.yarn.YarnClusterDescriptor.deployJobCluster(YarnClusterDescriptor.java:82)
at 
org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:238)

at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)
at 
org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1050)
at 
org.apache.flink.client.cli.CliFrontend.lambda$main$16(CliFrontend.java:1126)

at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1671)
at 
org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)

at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1126)
Caused by: 
org.apache.flink.yarn.AbstractYarnClusterDescriptor$YarnDeploymentException: 
The YARN application unexpectedly switched to state FAILED during 
deployment.
Diagnostics from YARN: Application application_1544777537685_0068 
failed 2 times due to AM Container for 
appattempt_1544777537685_0068_02 exited with  exitCode: 1
For more detailed output, check application tracking 
page:http://103-8-200-sh-100-F07.yidian.com:8088/proxy/application_1544777537685_0068/Then, 
click on links to logs of each attempt.

Diagnostics: Exception from container-launch.
Container id: container_e03_1544777537685_0068_02_01
Exit code: 1
Stack trace: ExitCodeException exitCode=1:
at org.apache.hadoop.util.Shell.runCommand(Shell.java:543)
at org.apache.hadoop.util.Shell.run(Shell.java:460)
at 
org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:720)
at 
org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor.launchContainer(DefaultContainerExecutor.java:210)
at 
org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:302)
at 
org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:82)

at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)

at java.lang.Thread.run(Thread.java:745)

Container exited with a non-zero exit code 1
Failing this attempt. Failing the application.
If log aggregation is enabled on your cluster, use this command to 
further investigate the issue:

yarn logs -applicationId application_1544777537685_0068
at 
org.apache.flink.yarn.AbstractYarnClusterDescriptor.startAppMaster(AbstractYarnClusterDescriptor.java:1065)
at 
org.apache.flink.yarn.AbstractYarnClusterDescriptor.deployInternal(AbstractYarnClusterDescriptor.java:545)
at 
org.apache.flink.yarn.YarnClusterDescriptor.deployJobCluster(YarnClusterDescriptor.java:75)

... 9 more
2019-01-07 17:08:55,463 INFO 
 org.apache.flink.yarn.AbstractYarnClusterDescriptor   - Cancelling 
deployment from Deployment Failure Hook
2019-01-07 17:08:55,464 INFO 
 org.apache.flink.yarn.AbstractYarnClusterDescriptor   - Killing YARN 
application
2019-01-07 17:08:55,471 INFO 
 org.apache.flink.yarn.AbstractYarnClusterDescriptor   - Deleting 
files in

*---End--EndEnd-*

My cluster has enable the log aggregation, so I executed the following 
command:
yarn logs -applicationId applic

Re: The way to write a UDF with generic type

2019-01-08 Thread Timo Walther
Currently, this functionality is hard-coded in the aggregation 
translation. Namely in 
`org.apache.flink.table.runtime.aggregate.AggregateUtil#transformToAggregateFunctions` 
[1].


Timo

[1] 
https://github.com/apache/flink/blob/master/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala


Am 08.01.19 um 06:41 schrieb yinhua.dai:

Hi Timo,

Can you let me know how the build-in "MAX" function able to support
different types?
Thanks.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/





Re: NoMatchingTableFactoryException when test flink sql with kafka in flink 1.7

2019-01-11 Thread Timo Walther

Hi Jashua,

according to the property list, you passed "connector.version=0.10" so a 
Kafka 0.8 factory will not match.


Are you sure you are compiling the right thing? There seems to be a 
mismatch between your screenshot and the exception.


Regards,
Timo

Am 11.01.19 um 15:43 schrieb Joshua Fan:

Hi,

I want to test flink sql locally by consuming kafka data in flink 1.7, 
but it turns out an exception like below.


Exception in thread "main"
org.apache.flink.table.api.NoMatchingTableFactoryException:
Could not find a suitable table factory for
'org.apache.flink.table.factories.StreamTableSourceFactory' in

the classpath.


Reason: No context matches.


The following properties are requested:

connector.properties.0.key=fetch.message.max.bytes

connector.properties.0.value=10485760

connector.properties.1.key=zookeeper.connect

connector.properties.1.value=10.xxx.:2181/kafka

connector.properties.2.key=group.id 

connector.properties.2.value=d4b53966-796e-4a2d-b6eb-a5c489db2b21

connector.properties.3.key=bootstrap.servers

connector.properties.3.value=10.xxx:9092

connector.property-version=1

connector.startup-mode=latest-offset

connector.topic=-flink-test

connector.type=kafka

connector.version=0.10

format.derive-schema=true

format.property-version=1

format.type=json

schema.0.name =rideId

schema.0.type=VARCHAR

schema.1.name =lon

schema.1.type=VARCHAR


The following factories have been considered:


org.apache.flink.streaming.connectors.kafka.Kafka08TableSourceSinkFactory

org.apache.flink.table.sources.CsvBatchTableSourceFactory

org.apache.flink.table.sources.CsvAppendTableSourceFactory

org.apache.flink.table.sinks.CsvBatchTableSinkFactory

org.apache.flink.table.sinks.CsvAppendTableSinkFactory

org.apache.flink.formats.json.JsonRowFormatFactory


at

org.apache.flink.table.factories.TableFactoryService$.filterByContext(TableFactoryService.scala:214)

at

org.apache.flink.table.factories.TableFactoryService$.findInternal(TableFactoryService.scala:130)

at

org.apache.flink.table.factories.TableFactoryService$.find(TableFactoryService.scala:81)

at

org.apache.flink.table.factories.TableFactoryUtil$.findAndCreateTableSource(TableFactoryUtil.scala:49)

at

org.apache.flink.table.descriptors.ConnectTableDescriptor.registerTableSource(ConnectTableDescriptor.scala:46)

at TableSourceFinder.main(TableSourceFinder.java:40)


here is my code:
public static void main(String[] args)throws Exception{
 final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment stEnv = TableEnvironment.getTableEnvironment(env); 
Kafka kafka =new Kafka(); Properties properties =new Properties();String zkString ="10.xxx:2181/kafka"; String brokerList ="10.xxx:9092"; 
properties.setProperty("fetch.message.max.bytes", "10485760"); properties.setProperty("group.id ", 
UUID.randomUUID().toString()); properties.setProperty("zookeeper.connect", zkString); properties.setProperty("bootstrap.servers", brokerList); 
kafka.version("0.8").topic("flink-test").properties(properties); kafka.startFromLatest(); stEnv.connect(kafka).withSchema(
 new Schema()
 .field("rideId", Types.STRING())
 .field("lon", Types.STRING()))
 .withFormat(new Json().deriveSchema())

 .registerTableSource("test"); Table table = stEnv.sqlQuery("select 
rideId from test"); DataStream ds = ((org.apache.flink.table.api.java.StreamTableEnvironment) 
stEnv).
 toAppendStream(table,Types.STRING()); ds.print(); 
env.execute("KafkaSql");

} 



And here is my pom.xml
 org.apache.flink flink-table_2.11 ${flink.version} 
  org.apache.flink flink-json 
${flink.version}

 org.apache.flink 
flink-connector-kafka-0.8_2.11 ${flink.version} 


In my opinion, I have all the lib in pom, don't know why it would fail 
in test locally.


Thank you for any hints.

Yours
Joshua





Re: [DISCUSS] Towards a leaner flink-dist

2019-01-23 Thread Timo Walther
+1 for Stephan's suggestion. For example, SQL connectors have never been 
part of the main distribution and nobody complained about this so far. I 
think what is more important than a big dist bundle is a helpful 
"Downloads" page where users can easily find available filesystems, 
connectors, metric repoters. Not everyone checks Maven central for 
available JAR files. I just saw that we added a "Optional components" 
section recently [1], we just need to make it more prominent. This is 
also done for the SQL connectors and formats [2].


[1] https://flink.apache.org/downloads.html
[2] 
https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/table/connect.html#dependencies


Regards,
Timo


Am 23.01.19 um 10:07 schrieb Ufuk Celebi:

I like the idea of a leaner binary distribution. At the same time I
agree with Jamie that the current binary is quite convenient and
connection speeds should not be that big of a deal. Since the binary
distribution is one of the first entry points for users, I'd like to
keep it as user-friendly as possible.

What do you think about building a lean distribution by default and a
"full" distribution that still bundles all the optional dependencies
for releases? (If you don't think that's feasible I'm still +1 to only
go with the "lean dist" approach.)

– Ufuk

On Wed, Jan 23, 2019 at 9:36 AM Stephan Ewen  wrote:

There are some points where a leaner approach could help.
There are many libraries and connectors that are currently being adding to
Flink, which makes the "include all" approach not completely feasible in
long run:

   - Connectors: For a proper experience with the Shell/CLI (for example for
SQL) we need a lot of fat connector jars.
 These come often for multiple versions, which alone accounts for 100s
of MBs of connector jars.
   - The pre-bundled FileSystems are also on the verge of adding 100s of MBs
themselves.
   - The metric reporters are bit by bit growing as well.

The following could be a compromise:

The flink-dist would include
   - the core flink libraries (core, apis, runtime, etc.)
   - yarn / mesos  etc. adapters
   - examples (the examples should be a small set of self-contained programs
without additional dependencies)
   - default logging
   - default metric reporter (jmx)
   - shells (scala, sql)

The flink-dist would NOT include the following libs (and these would be
offered for individual download)
   - Hadoop libs
   - the pre-shaded file systems
   - the pre-packaged SQL connectors
   - additional metric reporters


On Tue, Jan 22, 2019 at 3:19 AM Jeff Zhang  wrote:


Thanks Chesnay for raising this discussion thread.  I think there are 3
major use scenarios for flink binary distribution.

1. Use it to set up standalone cluster
2. Use it to experience features of flink, such as via scala-shell,
sql-client
3. Downstream project use it to integrate with their system

I did a size estimation of flink dist folder, lib folder take around 100M
and opt folder take around 200M. Overall I agree to make a thin flink dist.
So the next problem is which components to drop. I check the opt folder,
and I think the filesystem components and metrics components could be moved
out. Because they are pluggable components and is only used in scenario 1 I
think (setting up standalone cluster). Other components like flink-table,
flink-ml, flnk-gellay, we should still keep them IMHO, because new user may
still use it to try the features of flink. For me, scala-shell is the first
option to try new features of flink.



Fabian Hueske  于2019年1月18日周五 下午7:34写道:


Hi Chesnay,

Thank you for the proposal.
I think this is a good idea.
We follow a similar approach already for Hadoop dependencies and
connectors (although in application space).

+1

Fabian

Am Fr., 18. Jan. 2019 um 10:59 Uhr schrieb Chesnay Schepler <
ches...@apache.org>:


Hello,

the binary distribution that we release by now contains quite a lot of
optional components, including various filesystems, metric reporters and
libraries. Most users will only use a fraction of these, and as such
pretty much only increase the size of flink-dist.

With Flink growing more and more in scope I don't believe it to be
feasible to ship everything we have with every distribution, and instead
suggest more of a "pick-what-you-need" model, where flink-dist is rather
lean and additional components are downloaded separately and added by
the user.

This would primarily affect the /opt directory, but could also be
extended to cover flink-dist. For example, the yarn and mesos code could
be spliced out into separate jars that could be added to lib manually.

Let me know what you think.

Regards,

Chesnay



--
Best Regards

Jeff Zhang





Re: Is there a way to get all flink build-in SQL functions

2019-01-25 Thread Timo Walther
The problem right now is that Flink SQL has two stacks for defining 
functions. One is the built-in function stack that is based on Calcite 
and the other are the registered UDFs.


What you can do is to use 
FunctionCatalog.withBuiltIns.getSqlOperatorTable() for listing Calcite 
built-in functions and TableEnvironment.listFunctions() for registered UDFs.


I hope this helps.

Regards,
Timo

Am 25.01.19 um 10:17 schrieb Jeff Zhang:
I believe it make sense to list available udf programmatically. e.g. 
Users may want to see available udfs in sql-client. It would also 
benefit other downstream project that use flink sql. Besides that I 
think flink should also provide api for querying the description of 
udf about how to use it.


yinhua.dai mailto:yinhua.2...@outlook.com>> 
于2019年1月25日周五 下午5:12写道:


Thanks Guys.
I just wondering if there is another way except hard code the list:)
Thanks anyway.



--
Sent from:
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/



--
Best Regards

Jeff Zhang





Re: date format in Flink SQL

2019-01-29 Thread Timo Walther

Hi Soheil,

the functions for date/time conversion are pretty limited so far. The 
full list of supported functions can be found here [1]. If you need more 
(which is usually the case), it is easy to implement a custom function [2].


We rely on Java's java.sql.Date as a data type. You can use `SELECT DATE 
'2018-09-23'` to create literals.


I hope this helps.

Regards,
Timo

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/table/functions.html
[2] 
https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/table/udfs.html#scalar-functions


Am 29.01.19 um 09:42 schrieb Soheil Pourbafrani:

Hi,

I want to convert a string in the format of 1996-8-01 to date and 
create Table from the dataset of Tuple3 at the 
end. Since I want to apply SQL queries on the date field of the table, 
for example, "date_column < 1996-8-01", which java format of date is 
supported in Flink?





Re: SQL Client (Streaming + Checkpoint)

2019-01-29 Thread Timo Walther

Hi Vijay,

in general Yun is right, the SQL Client is still in an early prototyping 
phase. Some configuration features are missing.


You can track the progress of this feature here: 
https://issues.apache.org/jira/browse/FLINK-10265


It should be possible to use the global Flink configuration for now as a 
workaround:


https://ci.apache.org/projects/flink/flink-docs-release-1.7/ops/config.html#checkpointing

Regards,
Timo

Am 28.01.19 um 18:41 schrieb Yun Tang:

Hi Vijay

Unfortunately, current sql-client does not support to configure to 
enable checkpoint. Current execution properties for sql-client support 
both batch and streaming environment while batch environment dose not 
support checkpoint. I prefer current sql-client as a tool for 
prototyping, not production-ready yet.


Best
Yun Tang

*From:* Vijay Srinivasaraghavan 
*Sent:* Tuesday, January 29, 2019 0:53
*To:* User
*Subject:* SQL Client (Streaming + Checkpoint)
It looks like the SQL client does not configure enable checkpoint 
while submitting the streaming job query. Did anyone notice this 
behavior? FYI, I am using 1.6.x branch.


Regards
Vijay





Re: AssertionError: mismatched type $5 TIMESTAMP(3)

2019-01-29 Thread Timo Walther

Hi Chris,

the exception message is a bit misleading. The time attribute (time 
indicator) type is an internal type and should not be used by users.


The following line should solve your issue. Instead of:

DataStream> tradesByInstrStream = 
tableEnv.toRetractStream(tradesByInstr, typeInfo);


You can do

DataStream> tradesByInstrStream = 
tableEnv.toRetractStream(tradesByInstr, Row.class);


The API will automatically insert the right types for the table passed 
when using a plain `Row.class`.


I hope this helps.

Regards,
Timo



Am 25.01.19 um 20:14 schrieb Chris Miller:
I'm trying to group some data and then enrich it by joining with a 
temporal table function, however my test code (attached) is failing 
with the error shown below. Can someone please give me a clue as to 
what I'm doing wrong?


Exception in thread "main" java.lang.AssertionError: mismatched type 
$5 TIMESTAMP(3)
    at 
org.apache.calcite.rex.RexUtil$FixNullabilityShuttle.visitInputRef(RexUtil.java:2481)
    at 
org.apache.calcite.rex.RexUtil$FixNullabilityShuttle.visitInputRef(RexUtil.java:2459)

    at org.apache.calcite.rex.RexInputRef.accept(RexInputRef.java:112)
    at org.apache.calcite.rex.RexShuttle.visitList(RexShuttle.java:151)
    at org.apache.calcite.rex.RexShuttle.visitCall(RexShuttle.java:100)
    at org.apache.calcite.rex.RexShuttle.visitCall(RexShuttle.java:34)
    at org.apache.calcite.rex.RexCall.accept(RexCall.java:107)
    at org.apache.calcite.rex.RexShuttle.apply(RexShuttle.java:279)
    at org.apache.calcite.rex.RexShuttle.mutate(RexShuttle.java:241)
    at org.apache.calcite.rex.RexShuttle.apply(RexShuttle.java:259)
    at org.apache.calcite.rex.RexUtil.fixUp(RexUtil.java:1605)
    at 
org.apache.calcite.rel.rules.FilterJoinRule.perform(FilterJoinRule.java:230)
    at 
org.apache.calcite.rel.rules.FilterJoinRule$FilterIntoJoinRule.onMatch(FilterJoinRule.java:344)
    at 
org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:212)
    at 
org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.java:646)
    at 
org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:339)
    at 
org.apache.flink.table.api.TableEnvironment.runVolcanoPlanner(TableEnvironment.scala:374)
    at 
org.apache.flink.table.api.TableEnvironment.optimizeLogicalPlan(TableEnvironment.scala:292)
    at 
org.apache.flink.table.api.StreamTableEnvironment.optimize(StreamTableEnvironment.scala:812)
    at 
org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:860)
    at 
org.apache.flink.table.api.java.StreamTableEnvironment.toRetractStream(StreamTableEnvironment.scala:340)
    at 
org.apache.flink.table.api.java.StreamTableEnvironment.toRetractStream(StreamTableEnvironment.scala:272)
    at test.Test.main(Test.java:78) 





Re: UDAF Flink-SQL return null would lead to checkpoint fails

2019-01-30 Thread Timo Walther

Hi Henry,

could you share a little reproducible example? From what I see you are 
using a custom aggregate function with a case class inside, right? 
Flink's case class serializer does not support null because the usage of 
`null` is also not very Scala like.


Use a `Row` type for supporting nulls properly.

Hope this helps.

Timo


Am 30.01.19 um 12:35 schrieb 徐涛:

Hi Experts,
In my self-defined UDAF, I found if I return a null value in UDAF, 
would cause checkpoint fails, the following is the error log:
I think it is quite a common case to return a null value in UDAF, 
because sometimes no value could be determined, why Flink has such a 
limitation for UDAF return value?

Thanks a lot!


org.apache.flink.streaming.runtime.tasks.AsynchronousException: java.lang.Exception: 
Could not materialize checkpoint 4 for operator groupBy: (DRAFT_ORDER_ID), select: 
(DRAFT_ORDER_ID, latest_value_long_test($f1, LAST_UPDATE_TIME) AS tt) -> select: 
(CAST(DRAFT_ORDER_ID) AS EXPR$0, _UTF-16LE'order' AS EXPR$1, tt, _UTF-16LE'111' AS 
EXPR$3) -> to: Tuple2 -> Sink: Unnamed (2/4).
at 
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointExceptionHandler.tryHandleCheckpointException(StreamTask.java:1153)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:947)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:884)
at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.Exception: Could not materialize checkpoint 4 for operator groupBy: 
(DRAFT_ORDER_ID), select: (DRAFT_ORDER_ID, latest_value_long_test($f1, LAST_UPDATE_TIME) 
AS tt) -> select: (CAST(DRAFT_ORDER_ID) AS EXPR$0, _UTF-16LE'order' AS EXPR$1, tt, 
_UTF-16LE'111' AS EXPR$3) -> to: Tuple2 -> Sink: Unnamed (2/4).
at 
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:942)
... 6 common frames omitted
Caused by: java.util.concurrent.ExecutionException: 
org.apache.flink.types.NullFieldException: Field 0 is null, but expected to 
hold a value.
at java.util.concurrent.FutureTask.report(FutureTask.java:122)
at java.util.concurrent.FutureTask.get(FutureTask.java:192)
at 
org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:53)
at 
org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.(OperatorSnapshotFinalizer.java:47)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:853)
... 5 common frames omitted
Caused by: org.apache.flink.types.NullFieldException: Field 0 is null, but 
expected to hold a value.
at 
org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:116)
at 
org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:32)
at 
org.apache.flink.api.java.typeutils.runtime.RowSerializer.serialize(RowSerializer.java:161)
at 
org.apache.flink.api.java.typeutils.runtime.RowSerializer.serialize(RowSerializer.java:47)
at 
org.apache.flink.runtime.state.heap.CopyOnWriteStateTableSnapshot.lambda$getKeyGroupWriter$0(CopyOnWriteStateTableSnapshot.java:148)
at 
org.apache.flink.runtime.state.KeyGroupPartitioner$PartitioningResult.writeStateInKeyGroup(KeyGroupPartitioner.java:261)
at 
org.apache.flink.runtime.state.heap.HeapKeyedStateBackend$HeapSnapshotStrategy$1.callInternal(HeapKeyedStateBackend.java:757)
at 
org.apache.flink.runtime.state.heap.HeapKeyedStateBackend$HeapSnapshotStrategy$1.callInternal(HeapKeyedStateBackend.java:724)
at 
org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:76)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:50)
... 7 common frames omitted
Caused by: java.lang.NullPointerException: null
at 
org.apache.flink.api.common.typeutils.base.LongSerializer.serialize(LongSerializer.java:69)
at 
org.apache.flink.api.common.typeutils.base.LongSerializer.serialize(LongSerializer.java:32)
at 
org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:113)
... 17 common frames omitted



Best
Henry





Re: Table API zipWithIndex

2019-02-01 Thread Timo Walther

Hi Flavio,

I guess you are looking for a unique identifier for rows, right? 
Currently, this is not possible in Table API. There, we only support 
UUID(). Once the Table API has been enhanced to be more interactive, we 
might support such features.


Regards,
Timo

Am 01.02.19 um 11:16 schrieb Flavio Pompermaier:

Hi to all,
is there any plan to support the equivalent 
of DataSetUtils.zipWithIndex as a select expression in the Table API? 
is it possible to implement it as an UDF right now?


Best,
Flavio





Re: AssertionError: mismatched type $5 TIMESTAMP(3)

2019-02-06 Thread Timo Walther

Hi Chris,

the error that you've observed is a bug that might be related to another 
bug that is not easily solvable.


I created an issue for it nevertheless: 
https://issues.apache.org/jira/browse/FLINK-11543


In general, I think you need to adapt your program in any case. Because 
you are aggregating on a rowtime attribute, it will loose its time 
attribute property and becomes a regular timestamp. Thus, you can't use 
it for a temporal table join.


Maybe the following training from the last FlinkForward conference might 
help you. I would recommend the slide set there to understand the 
different between streaming operations and what we call "materializing" 
operations:


https://github.com/dataArtisans/sql-training/wiki/SQL-on-streams

I hope this helps. Feel free to ask further questions.

Regards,
Timo

Am 05.02.19 um 11:30 schrieb Chris Miller:
Exception in thread "main" java.lang.AssertionError: mismatched type 
$5 TIMESTAMP(3)
    at 
org.apache.calcite.rex.RexUtil$FixNullabilityShuttle.visitInputRef(RexUtil.java:2481)
    at 
org.apache.calcite.rex.RexUtil$FixNullabilityShuttle.visitInputRef(RexUtil.java:2459)

    at org.apache.calcite.rex.RexInputRef.accept(RexInputRef.java:112)
    at org.apache.calcite.rex.RexShuttle.visitList(RexShuttle.java:151)
    at org.apache.calcite.rex.RexShuttle.visitCall(RexShuttle.java:100)
    at org.apache.calcite.rex.RexShuttle.visitCall(RexShuttle.java:34)
    at org.apache.calcite.rex.RexCall.accept(RexCall.java:107)
    at org.apache.calcite.rex.RexShuttle.apply(RexShuttle.java:279)
    at org.apache.calcite.rex.RexShuttle.mutate(RexShuttle.java:241)
    at org.apache.calcite.rex.RexShuttle.apply(RexShuttle.java:259)
    at org.apache.calcite.rex.RexUtil.fixUp(RexUtil.java:1605)
    at 
org.apache.calcite.rel.rules.FilterJoinRule.perform(FilterJoinRule.java:230)
    at 
org.apache.calcite.rel.rules.FilterJoinRule$FilterIntoJoinRule.onMatch(FilterJoinRule.java:344)
    at 
org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:212)
    at 
org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.java:646)
    at 
org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:339)
    at 
org.apache.flink.table.api.TableEnvironment.runVolcanoPlanner(TableEnvironment.scala:374)
    at 
org.apache.flink.table.api.TableEnvironment.optimizeLogicalPlan(TableEnvironment.scala:292)
    at 
org.apache.flink.table.api.StreamTableEnvironment.optimize(StreamTableEnvironment.scala:812)
    at 
org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:860)
    at 
org.apache.flink.table.api.java.StreamTableEnvironment.toRetractStream(StreamTableEnvironment.scala:340)
    at 
org.apache.flink.table.api.java.StreamTableEnvironment.toRetractStream(StreamTableEnvironment.scala:272)
    at test.Test.main(Test.java:78) 





Re: FLIP-16, FLIP-15 Status Updates?

2019-02-19 Thread Timo Walther

Hi John,

you are right that there was not much progress in the last years around 
these two FLIPs. Mostly due to shift of priorities. However, with the 
big Blink code contribution from Alibaba and joint development forces 
for a unified batch and streaming runtime [1], it is very likely that 
also iterations and thus machine learning algorithms will see more 
development efforts.


The community is working on roadmap page for the website. And I can 
already reveal that a new iterations model is mentioned there. The new 
Flink roadmap page can be expected in the next 2-3 weeks.


I hope this information helps.

Regards,
Timo

[1] 
https://flink.apache.org/news/2019/02/13/unified-batch-streaming-blink.html


Am 19.02.19 um 12:47 schrieb John Tipper:

Hi All,

Does anyone know what the current status is for FLIP-16 (loop fault tolerance) 
and FLIP-15 (redesign iterations) please? I can see lots of work back in 2016, 
but it all seemed to stop and go quiet since about March 2017. I see iterations 
as offering very interesting capabilities for Flink, so it would be good to 
understand how we can get this moving again.

Many thanks,

John

Sent from my iPhone





Re: flink sql about nested json

2019-03-04 Thread Timo Walther

Hi,

Flink SQL JSON format supports nested formats like the schema that you 
posted. Maybe the renaming with `from()` works not as expected. Did you 
try it without the `from()` where schema fields are equal to JSON fields?


Alternatively, you could also define the schema only and use the 
`deriveSchema()` mode of the format.


Btw there is a big bug in the JSON format that could affect how rows are 
parsed (https://issues.apache.org/jira/browse/FLINK-11727).


Maybe it is worth it to write your own format and perform the JSON 
parsing logic how you would like it.


Regards,
Timo

Am 04.03.19 um 08:38 schrieb 杨光:

Hi,
i am trying the flink sql api to read json formate data from kafka topic.
My json schema is a nested json like this
{
  "type": "object",
  "properties": {
    "table": {
      "type": "string"
    },
    "str2": {
      "type": "string"
    },
    "obj1": {
      "type": "object",
      "properties": {
        "rkey": {
          "type": "string"
        },
        "val": {
          "type": "string"
        },
        "lastTime": {
          "type": "number"
        }
      },
      "required": ["lastTime", "rkey", "val"]
    },
    "obj2": {
      "type": "object",
      "properties": {
        "val": {
          "type": "string"
        },
        "lastTime": {
          "type": "number"
        }
      },
      "required": ["lastTime", "val"]
    }
  },
  "required": ["table", "str2", "obj1", "obj2"]
}

i define a table sechema like this.

Schema schemaDesc1 = new Schema()
        ...
        .field("tablestr", Types.STRING).from("table")
        ...
        .field("rkey", Types.STRING).from("rkey");


when i run a debug case ,i got error about the "rkey" field (the file 
in the nest obj1)
" SQL validation failed. Table field 'rkey' was resolved to 
TableSource return type field 'rkey', but field 'rkey' was not found 
in the return type Row".


My question is :does the org.apache.flink.table.descriptors.Json 
format support nested json schema? If does ,how can i set the right 
format or schema ? If not ,then how can i apply flink sql api on 
nested json data source.





Re: [DISCUSS] Introduction of a Table API Java Expression DSL

2019-03-21 Thread Timo Walther

Thanks for your feedback Rong and Jark.

@Jark: Yes, you are right that the string-based API is used quite a lot. 
On the other side, the potential user base in the future is still bigger 
than our current user base. Because the Table API will become equally 
important as the DataStream API, we really need to fix some crucial 
design decisions before it is too late. I would suggest to introduce the 
new DSL in 1.9 and remove the Expression parser either in 1.10 or 1.11. 
From a developement point of view, I think we can handle the overhead 
to maintain 3 APIs until then because 2 APIs will share the same code 
base + expression parser.


Regards,
Timo

Am 21.03.19 um 05:21 schrieb Jark Wu:

Hi Timo,

I'm +1 on the proposal. I like the idea to provide a Java DSL which is 
more friendly than string-based approach in programming.


My concern is if/when we can drop the string-based expression parser. 
If it takes a very long time, we have to paid more development
cost on the three Table APIs. As far as I know, the string-based API 
is used in many companies.
We should also get some feedbacks from users. So I'm CCing this email 
to user mailing list.


Best,
Jark



On Wed, 20 Mar 2019 at 08:51, Rong Rong <mailto:walter...@gmail.com>> wrote:


Thanks for sharing the initiative of improving Java side Table
expression
DSL.

I agree as in the doc stated that Java DSL was always a "3rd class
citizen"
and we've run into many hand holding scenarios with our Flink
developers
trying to get the Stringify syntax working.
Overall I am a +1 on this, it also help reduce the development
cost of the
Table API so that we no longer need to maintain different DSL and
documentations.

I left a few comments in the doc. and also some features that I
think will
be beneficial to the final outcome. Please kindly take a look @Timo.

Many thanks,
Rong

    On Mon, Mar 18, 2019 at 7:15 AM Timo Walther mailto:twal...@apache.org>> wrote:

> Hi everyone,
>
> some of you might have already noticed the JIRA issue that I opened
> recently [1] about introducing a proper Java expression DSL for the
> Table API. Instead of using string-based expressions, we should
aim for
> a unified, maintainable, programmatic Java DSL.
>
> Some background: The Blink merging efforts and the big
refactorings as
> part of FLIP-32 have revealed many shortcomings in the current
Table &
> SQL API design. Most of these legacy issues cause problems
nowadays in
> making the Table API a first-class API next to the DataStream
API. An
> example is the ExpressionParser class[2]. It was implemented in the
> early days of the Table API using Scala parser combinators.
During the
> last years, this parser caused many JIRA issues and user
confusion on
> the mailing list. Because the exceptions and syntax might not be
> straight forward.
>
> For FLINK-11908, we added a temporary bridge instead of
reimplementing
> the parser in Java for FLIP-32. However, this is only a intermediate
> solution until we made a final decision.
>
> I would like to propose a new, parser-free version of the Java
Table API:
>
>
>

https://docs.google.com/document/d/1r3bfR9R6q5Km0wXKcnhfig2XQ4aMiLG5h2MTx960Fg8/edit?usp=sharing
>
> I already implemented an early protoype that shows that such a
DSL is
> not much implementation effort and integrates nicely with all
existing
> API methods.
>
> What do you think?
>
> Thanks for your feedback,
>
> Timo
>
> [1] https://issues.apache.org/jira/browse/FLINK-11890
>
> [2]
>
>

https://github.com/apache/flink/blob/master/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/expressions/PlannerExpressionParserImpl.scala
>
>





Re: Emitting current state to a sink

2019-04-26 Thread Timo Walther

Hi Avi,

did you have a look at the .connect() and .broadcast() API 
functionalities? They allow you to broadcast a control stream to all 
operators. Maybe this example [1] or other examples in this repository 
can help you.


Regards,
Timo

[1] 
https://github.com/ververica/flink-training-exercises/blob/master/src/main/java/com/dataartisans/flinktraining/solutions/datastream_java/broadcast/TaxiQuerySolution.java


Am 26.04.19 um 07:57 schrieb Avi Levi:

Hi,
We have a keyed pipeline with persisted state.
Is there a way to broadcast a command and collect all values that 
persisted in  the state ?


The end result can be for example sending a fetch command to all 
operators and emitting the results to some sink


why do we need it ? from time to time we might want to check if we are 
missing keys what are the additional keys or simply emit the current 
state to a table and to query it.


I tried simply broadcasting a command and addressing the persisted 
state but that resulted with:
java.lang.NullPointerException: No key set. This method should not be 
called outside of a keyed context.


is there a good way to achieve that ?

Cheers
Avi





Re: Exceptions when launching counts on a Flink DataSet concurrently

2019-04-26 Thread Timo Walther

Hi Juan,

as far as I know we do not provide any concurrency guarantees for 
count() or collect(). Those methods need to be used with caution anyways 
as the result size must not exceed a certain threshold. I will loop in 
Fabian who might know more about the internals of the execution?


Regards,
Timo


Am 26.04.19 um 03:13 schrieb Juan Rodríguez Hortalá:

Any thoughts on this?

On Sun, Apr 7, 2019, 6:56 PM Juan Rodríguez Hortalá 
> wrote:


Hi,

I have a very simple program using the local execution
environment, that throws NPE and other exceptions related to
concurrent access when launching a count for a DataSet from
different threads. The program is
https://gist.github.com/juanrh/685a89039e866c1067a6efbfc22c753e
which is basically this:

def doubleCollectConcurrent = {
   val env = ExecutionEnvironment.createLocalEnvironment(3)
   val xs = env.fromCollection(1 to100).map{_+1}
   implicit val ec = 
ExecutionContext.fromExecutor(Executors.newFixedThreadPool(10))

   val pendingActions =Seq.fill(10)(
 Future {println(s"xs.count = ${xs.count}") }
   )
   val pendingActionsFinished = Future.fold(pendingActions)(Unit){ (u1, u2) 
=>
 println("pending action finished")
 Unit }
   Await.result(pendingActionsFinished, 10 seconds)

   ok }


It looks like the issue is on OperatorTranslation.java at

https://github.com/apache/flink/blob/master/flink-java/src/main/java/org/apache/flink/api/java/operators/OperatorTranslation.java#L51,
when a sink is added to the sinks list while that list is being
traversed. I have the impression that this is by design, so I'd
like to confirm that this is the expected behaviour, and whether
this is happening only for the local execution environment, or if
this affects all execution environments implementations. Other
related questions I have are:

  * Is this documented somewhere? I'm quite new to Flink, so I
might have missed this. Is there any known workaround for
concurrently launching counts and other sink computations on
the same DataSet?
  * Is it safe performing a sequence of calls to DataSet sink
methods like count or collect, on the same DataSet, as long as
they are performed from the same thread? From my experience it
looks like it is, but I'd like to get a confirmation if possible.

This might be related to

https://stackoverflow.com/questions/51035465/concurrentmodificationexception-in-flink
but I'm not sure.

Thanks a lot for your help.

Greetings,

Juan





Re: Serialising null value in case-class

2019-04-26 Thread Timo Walther

Hi Averell,

the reason for this lies in the internal serializer implementation. In 
general, the composite/wrapping type serializer is responsible for 
encoding nulls. The case class serialzer does not support nulls, because 
Scala discourages the use of nulls and promotes `Option`. Some 
serializers such as `String` use the length binary field internally to 
encode nulls see [1]. For a full list of Scala types, I would recommend 
this class [2].


Regards,
Timo

[1] 
https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/types/StringValue.java#L788
[2] 
https://github.com/apache/flink/blob/master/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/Types.scala


Am 26.04.19 um 11:30 schrieb Averell:

Good day,

I have a case-class defined like this:

 case class MyClass(ts: Long, s1: String, s2: String, i1: Integer,  i2:
Integer)
 object MyClass {
 val EMPTY = MyClass(0L, null, null, 0, 0)
 def apply(): MyClass = EMPTY
 }

My code has been running fine (I was not aware of the limitation mentioned
in
https://ci.apache.org/projects/flink/flink-docs-stable/dev/types_serialization.html)

But when I tried to create the instance /MyClass(0L, null, null, *null*,
0)/, I got the following error: /org.apache.flink.types.NullFieldException:
Field 3 is null, but expected to hold a value./

I am confused. Why there's the difference between a null String and a null
Integer?

Thanks and regards,
Averell



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/





Re: Serialising null value in case-class

2019-04-26 Thread Timo Walther
Currently, tuples and case classes are the most efficient data types 
because they avoid the need for special null handling. Everything else 
is hard to estimate. You might need to perform micro benchmarks with the 
serializers you want to use if you have a very performance critical use 
case. Object types vs primitive types don't make a big difference (also 
for Scala) as their value is serialzed by the same serializers.


I hope this helps.

Timo


Am 26.04.19 um 13:17 schrieb Averell:

Thank you Timo.

In term of performance, does the use of Option[] cause performance impact? I
guess that there is because there will be one more layer of object handling,
isn't it?

I am also confused about choosing between primitive types (Int, Long) vs
object type (Integer, JLong). I have seen many places in Flink documents
that Java primitive types are recommended. But how are Scala types?

Thanks and regards,
Averell



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/





Re: POJO with private fields and toApeendStream of StreamTableEnvironment

2019-04-29 Thread Timo Walther

Hi Sung,

private fields are only supported if you specify getters and setters 
accordingly. Otherwise you need to use `Row.class` and perform the 
mapping in a subsequent map() function manually via reflection.


Regards,
Timo


Am 29.04.19 um 15:44 schrieb Sung Gon Yi:
In 
https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/table/common.html#convert-a-table-into-a-datastream-or-dataset, 


POJO data type is available to convert to DataStream.

I would like to use POJO data type class with private fields. I wonder 
it is possible or not officially.

Any currently it does not work.

Codes:
—
CsvTableSource as = CsvTableSource.builder()
 .path("aa.csv")
 .field("name",STRING)
 .field("value",INT)
 .build();
Table aa = tEnv.fromTableSource(as);
tEnv.toAppendStream(aa, P.class);
—
public class Pimplements Serializable {
 private Stringname;
 private Integervalue;
}
—

Above codes, I got below error message:
==
Exception in thread "main" org.apache.flink.table.api.TableException: 
Arity [2] of result [ArrayBuffer(String, Integer)] does not match the 
number[1] of requested type [GenericType].
at 
org.apache.flink.table.api.TableEnvironment.generateRowConverterFunction(TableEnvironment.scala:1165)
at 
org.apache.flink.table.api.StreamTableEnvironment.getConversionMapper(StreamTableEnvironment.scala:423)
at 
org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:936)
at 
org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:866)
at 
org.apache.flink.table.api.java.StreamTableEnvironment.toAppendStream(StreamTableEnvironment.scala:202)
at 
org.apache.flink.table.api.java.StreamTableEnvironment.toAppendStream(StreamTableEnvironment.scala:156)

at ...
==

When fields of class P are changed to “public”, it works well.
—
public class Pimplements Serializable {
 public Stringname;
 public Integervalue;
}
—

Thanks,
skonmeme







Re: Table program cannot be compiled

2019-05-16 Thread Timo Walther

Hi,

too many arguments for calling a UDF could currently lead to "grows 
beyond 64 KB" and maybe also causes the GC exception. This is a known 
issue covered in https://issues.apache.org/jira/browse/FLINK-8921.


Could you also add the tags to the function itself? Maybe as a static 
map for constant time access outside of the eval method?


Regards,
Timo


Am 15.05.19 um 17:10 schrieb Andrey Zagrebin:

Hi, I am looping in Timo and Dawid to look at the problem.

On Tue, May 14, 2019 at 9:12 PM shkob1 > wrote:


BTW looking at past posts on this issue[1] it should have been
fixed? i'm
using version 1.7.2
Also the recommendation was to use a custom function, though
that's exactly
what im doing with the conditionalArray function[2]

Thanks!

[1]

http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/DataStreamCalcRule-1802-quot-grows-beyond-64-KB-when-execute-long-sql-td20832.html#a20841

[2]
public class ConditionalArrayFunction extends ScalarFunction {

    public static final String NAME = "conditionalArray";

    public String[] eval(Object... keyValues) {
        if (keyValues.length == 0) {
            return new String[]{};
        }
        final List keyValuesList = Arrays.asList(keyValues);
        List trueItems = Lists.newArrayList();
        for (int i = 0; i < keyValuesList.size(); i = i + 2){
            final String key = (String)keyValuesList.get(i);
            final Object value = keyValuesList.get(i + 1);

            if (value != null && (boolean)value)
                trueItems.add(key);
        }
        return trueItems.toArray(new String[0]);
    }
}




--
Sent from:
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/





Re: Table program cannot be compiled

2019-05-20 Thread Timo Walther

Hi Shahar,

yes the number of parameters should be the issue for a cannot compile 
exception. If you moved most of the constants to a member in the 
function, it should actually work.


Do you have a little reproducible example somewhere?

Thanks,
Timo



Am 16.05.19 um 19:59 schrieb shkob1:

Hi Timo,

Thanks for the link.
Not sure i understand your suggestion though, is the goal here reducing the
amount of parameters coming to the UDF? if thats the case i can maybe have
the tag names there, but still need the expressions to get evaluated before
entering the eval. Do you see this in a different way?

I tried moving the tag names to be a member within the function instead of a
parameter, but apparently i still have too many arguments.

Let me know if this is not what you meant.

Thanks!
Shahar



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/





Re: Generic return type on a user-defined scalar function

2019-05-20 Thread Timo Walther

Hi Morrisa,

usually, this means that you class is not recognized as a POJO. Please 
check again the requirements of a POJO: Default constructor, getters and 
setters for every field etc. You can use 
org.apache.flink.api.common.typeinfo.Types.POJO(...) to verify if your 
class is a POJO or not.


I hope this helps.

Regards,
Timo


Am 16.05.19 um 23:18 schrieb Morrisa Brenner:


Hi Flink folks,


In a Flink job using the SQL API that I’m working on, I have a custom 
POJO data type with a generic field, and I would like to be able to 
call a user-defined function on this field.I included a similar 
function below with the business logic stubbed out, but this example 
has the return type I'm looking for.



I have no issues using custom functions of this type when they're used 
in a select statement and the `getResultType` method is excluded from 
the user-defined function class, but I am unable to get the type 
information to resolve correctly in contexts like order by and group 
by statements. It still doesn't work even if the `getResultType` 
method defines the specific type for a given object explicitly because 
the job compiler within Flink seems to be assuming the return type 
from the `eval` method is just an Object (type erasure...), and it 
fails to generate the object code because it's detecting invalid casts 
to the desired output type. Without the `getResultType` method, it 
just fails to detect type entirely. This seems to be fine when it's 
just a select, but if I try to make it do any operation (like group 
by) I get the following error: 
"org.apache.flink.api.common.InvalidProgramException: This type 
(GenericType) cannot be used as key."



Does anyone know if there's a way to get Flink to pay attention to the 
type information from `getResultType` when compiling the `eval` method 
so that the types work out? Or another way to work around the type 
erasure on the eval method without defining explicit user-defined 
function classes for each type?



Thanks for your help!


Morrisa




Code snippet:



package flink_generics_testing;


import org.apache.flink.api.common.typeinfo.TypeInformation;

import org.apache.flink.table.functions.ScalarFunction;


/**

* Reads custom values from a table and performs a function on those 
values.


* T should be able to be a String, long, float, boolean, or Date

*

* @param  The expected type of the table column values.

*/

public class CustomScalarFunction extends ScalarFunction {


private static final long serialVersionUID = -5537657771138360838L;


  private final Class desiredType;


/**

   * Construct an instance.

   *

   * @param desiredType The type of the value that we're performing 
the function on.


   */

public CustomScalarFunction(Class desiredType) {

this.desiredType = desiredType;

}

public T eval(T value) {

return value;

}


@Override

public TypeInformation getResultType(Class[] signature) {

return TypeInformation.of(desiredType);

}


@Override

public TypeInformation[] getParameterTypes(Class[] signature) {

return new TypeInformation[]{

  TypeInformation.of(desiredType)

  };

}

}



--
Morrisa Brenner
Software Engineer

225 Franklin St, Boston, MA 02110
klaviyo.com 
Klaviyo Logo





Re: Is the provided Serializer/TypeInformation checked "too late"?

2019-07-08 Thread Timo Walther

Hi Niels,

the type handling evolved during the years and is a bit messed up 
through the different layers. You are almost right with your last 
assumption "Is the provided serialization via TypeInformation 'skipped' 
during startup and only used during runtime?". The type extraction 
returns a Kryo type and the Kryo type is using the configured default 
serializers during runtime. Therefore, the log entry is just an INFO but 
not a WARNING. And you did everything correct.


Btw there is also the possiblity to insert a custom type into the type 
extration by using Type Factories [0].


Maybe as a side comment: We are aware of these confusions and the Table 
& SQL API will hopefully not use the TypeExtractor anymore in 1.10. This 
is what I am working on at the moment.


Regards,
Timo

[0] 
https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/types_serialization.html#defining-type-information-using-a-factory


Am 08.07.19 um 14:17 schrieb Niels Basjes:

Hi,

Context:
I'm looking into making the Google (BigQuery compatible) HyperLogLog++
implementation available in Flink because it is simply an Apache
licensed opensource library
- https://issuetracker.google.com/issues/123269269
- https://issues.apache.org/jira/browse/BEAM-7013
- https://github.com/google/zetasketch

While doing this I noticed that even though I provided an explicit
Kryo Serializer for the core class

i.e. I did 
senv.getConfig().registerTypeWithKryoSerializer(HyperLogLogPlusPlus.class,
HLLSerializer.class);

I still see messages like this when registering a new
UserDefinedFunction (AggregateFunction / ScalarFunction) that has this
class as either input of output:

13:59:57,316 [INFO ] TypeExtractor   : 1815:
class com.google.zetasketch.HyperLogLogPlusPlus does not contain a
getter for field allowedTypes
13:59:57,317 [INFO ] TypeExtractor   : 1818:
class com.google.zetasketch.HyperLogLogPlusPlus does not contain a
setter for field allowedTypes
13:59:57,317 [INFO ] TypeExtractor   : 1857:
Class class com.google.zetasketch.HyperLogLogPlusPlus cannot be used
as a POJO type because not all fields are valid POJO fields, and must
be processed as GenericType. Please read the Flink documentation on
"Data Types & Serialization" for details of the effect on performance.

So it is complaining about the serialization performance when done in
a different way than was configured.

Then I noticed that I see similar messages in other situations too.

In this code
https://github.com/nielsbasjes/yauaa/blob/master/udfs/flink-table/src/test/java/nl/basjes/parse/useragent/flink/table/DemonstrationOfTumblingTableSQLFunction.java#L165

I see
13:59:58,478 [INFO ] TypeExtractor   : 1815:
class org.apache.flink.types.Row does not contain a getter for field
fields
13:59:58,478 [INFO ] TypeExtractor   : 1818:
class org.apache.flink.types.Row does not contain a setter for field
fields
13:59:58,479 [INFO ] TypeExtractor   : 1857:
Class class org.apache.flink.types.Row cannot be used as a POJO type
because not all fields are valid POJO fields, and must be processed as
GenericType. Please read the Flink documentation on "Data Types &
Serialization" for details of the effect on performance.

even though a full TypeInformation instance for that type was provided

TypeInformation tupleType = new RowTypeInfo(SQL_TIMESTAMP,
STRING, STRING, STRING, STRING, LONG);
DataStream resultSet = tableEnv.toAppendStream(resultTable, tupleType);

I checked with my debugger and the code IS using for both mentioned
examples the correct serialization classes when running.

So what is happening here?
Did I forget to do a required call?
So is this a bug?
Is the provided serialization via TypeInformation 'skipped' during
startup and only used during runtime?





Re: Flink Table API and Date fields

2019-07-08 Thread Timo Walther

Hi Flavio,

yes I agree. This check is a bit confusing. The initial reason for that 
was that sql.Time, sql.Date, and sql.Timestamp extend from util.Date as 
well. But handling it as a generic type as Jingson mentioned might be 
the better option in order to write custom UDFs to handle them.


Regards,
Timo

Am 08.07.19 um 12:04 schrieb Flavio Pompermaier:
Of course there are java.sql.* and java.time.* in Java but it's also 
true that most of the times the POJOs you read come from an external 
(Maven) lib and if such POJOs contain date fields you have to create a 
local version of that POJO having the java.util.Date fields replaced 
by a java.sql.Date version of them.
Moreover you also have to create a conversion function from the 
original POJO to the Flink-specific one source (and this is very 
annoying expecially because if the POJO gets modified you have to 
check that your conversion function is updated accordingly).


Summarising, it is possible to work around this limitation but it's 
very uncomfortable (IMHO)


On Mon, Jul 8, 2019 at 11:52 AM JingsongLee > wrote:


Flink 1.9 blink runner will support it as Generic Type,
But I don't recommend it. After all, there are java.sql.Date and
java.time.* in Java.

Best, JingsongLee

--
From:Flavio Pompermaier mailto:pomperma...@okkam.it>>
Send Time:2019年7月8日(星期一) 15:40
To:JingsongLee mailto:lzljs3620...@aliyun.com>>
Cc:user mailto:user@flink.apache.org>>
Subject:Re: Flink Table API and Date fields

I think I could do it for this specific use case but isn't
this a big limitation of Table API?
I think that java.util.Date should be a first class citizen in
Flink..

Best,
Flavio

On Mon, Jul 8, 2019 at 4:06 AM JingsongLee
mailto:lzljs3620...@aliyun.com>> wrote:
Hi Flavio:
Looks like you use java.util.Date in your pojo, Now Flink
table not support BasicTypeInfo.DATE_TYPE_INFO
because of the limitations of some judgments in the code.
Can you use java.sql.Date?

Best, JingsongLee

--
From:Flavio Pompermaier mailto:pomperma...@okkam.it>>
Send Time:2019年7月5日(星期五) 22:52
To:user mailto:user@flink.apache.org>>
Subject:Flink Table API and Date fields

Hi to all,
in my use case I have a stream of POJOs with Date fields.
When I use Table API I get the following error:

Exception in thread "main"
org.apache.flink.table.api.ValidationException: SQL validation
failed. Type is not supported: Date
at

org.apache.flink.table.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:112)
at

org.apache.flink.table.planner.StreamPlanner.toRel(StreamPlanner.scala:148)
at

org.apache.flink.table.planner.StreamPlanner.parse(StreamPlanner.scala:114)
at

org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:268)
Caused by: org.apache.flink.table.api.TableException: Type is
not supported: Date
at

org.apache.flink.table.calcite.FlinkTypeFactory$.org$apache$flink$table$calcite$FlinkTypeFactory$$typeInfoToSqlTypeName(FlinkTypeFactory.scala:357)


Is there a way to deal with this without converting the Date
field to a Long one?

Best,
Flavio







Re: 1.9 Release Timeline

2019-07-23 Thread Timo Walther

Hi Oytun,

the community is working hard to release 1.9. You can see the progress 
here [1] and on the dev@ mailing list.


[1] 
https://issues.apache.org/jira/secure/RapidBoard.jspa?rapidView=328&view=detail


Regards,
Timo

Am 23.07.19 um 15:52 schrieb Oytun Tez:

Ping, any estimates?

---
Oytun Tez

*M O T A W O R D*
The World's Fastest Human Translation Platform.
oy...@motaword.com  — www.motaword.com 




On Thu, Jul 18, 2019 at 11:07 AM Oytun Tez > wrote:


Hi team,

1.9 is bringing very exciting updates, State Processor API and
MapState migrations being two of them. Thank you for all the hard
work!

I checked the burndown board [1], do you have an estimated
timeline for the GA release of 1.9?



[1]

https://issues.apache.org/jira/secure/RapidBoard.jspa?projectKey=FLINK&rapidView=328

---
Oytun Tez

*M O T A W O R D*
The World's Fastest Human Translation Platform.
oy...@motaword.com  — www.motaword.com






<    1   2   3   4   5   6   7   >