Re: [Question] enable end2end Kafka exactly once processing

2020-03-01 Thread Arvid Heise
Hi Eleanore,

the flink runner is maintained by the Beam developers, so it's best to ask
on their user list.

The documentation is, however, very clear. "Flink runner is one of the
runners whose checkpoint semantics are not compatible with current
implementation (hope to provide a solution in near future)."
So, Beam uses a different approach to EOS than Flink and there is currently
no way around it. Maybe, you could use the EOS Kafka Sink of Flink directly
and use that in Beam somehow.

I'm not aware of any work with the Beam devs to actually make it work.
Independently, we started to improve our interfaces for two phase commit
sinks (which is our approach). It might coincidentally help Beam.

Best,

Arvid

On Sun, Mar 1, 2020 at 8:23 PM Jin Yi  wrote:

> Hi experts,
>
> My application is using Apache Beam and with Flink to be the runner. My
> source and sink are kafka topics, and I am using KafkaIO connector provided
> by Apache Beam to consume and publish.
>
> I am reading through Beam's java doc:
> https://beam.apache.org/releases/javadoc/2.16.0/org/apache/beam/sdk/io/kafka/KafkaIO.WriteRecords.html#withEOS-int-java.lang.String-
>
> It looks like Beam does not support Flink Runner for EOS, can someone
> please shad some lights on how to enable exactly once processing with
> Apache Beam?
>
> Thanks a lot!
> Eleanore
>


Schema registry deserialization: Kryo NPE error

2020-03-01 Thread Nitish Pant
Hi all,

I am trying to work with flink to get avro data from kafka for which the 
schemas are stored in kafka schema registry. Since, the producer for kafka is a 
totally different service(an MQTT consumer sinked to kafka), I can’t have the 
schema with me at the consumer end. I read around and diverged to the following 
implementation of KeyedDeserializationSchema but I cannot understand why it’s 
throwing a `com.esotericsoftware.kryo.KryoException: 
java.lang.NullPointerException`

class AvroDeserializationSchema(schemaRegistryUrl: String) extends 
KeyedDeserializationSchema[GenericRecord] {

  // Flink needs the serializer to be serializable => this "@transient lazy 
val" does the trick
  @transient lazy val valueDeserializer = {
val deserializer = new KafkaAvroDeserializer(new 
CachedSchemaRegistryClient(schemaRegistryUrl, 
AbstractKafkaAvroSerDeConfig.MAX_SCHEMAS_PER_SUBJECT_DEFAULT))
deserializer.configure(
  Map(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG -> 
schemaRegistryUrl,
KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG -> 
false).asJava,
  false)
deserializer
  }

  override def isEndOfStream(nextElement: GenericRecord): Boolean = false

  override def deserialize(messageKey: Array[Byte], message: Array[Byte],
topic: String, partition: Int, offset: Long): GenericRecord = {

 // val key = keyDeserializer.deserialize(topic, 
messageKey).asInstanceOf[String]
  val value = valueDeserializer.deserialize(topic, 
message).asInstanceOf[GenericRecord]

  value
  }

  override def getProducedType: TypeInformation[GenericRecord] =
TypeExtractor.getForClass(classOf[GenericRecord])
}

I have no clue how to go about solving this. I saw a lot of people trying to 
implement the same. If someone can guide me, it’d be really helpful.

Thanks!
Nitish

回复: 使用Flink1.10.0读取hive时source并行度问题

2020-03-01 Thread like
非常感谢!我尝试关闭自动推断后,已经可以控制source并行度了,自动推断可能面临资源不足无法启动的问题。




在2020年3月2日 15:18,JingsongLee 写道:
Hi,

1.10中,Hive source是自动推断并发的,你可以使用以下参数配置到flink-conf.yaml里面来控制并发:
- table.exec.hive.infer-source-parallelism=true (默认使用自动推断)
- table.exec.hive.infer-source-parallelism.max=1000 (自动推断的最大并发)

Sink的并发默认和上游的并发相同,如果有Shuffle,使用配置的统一并发。

Best,
Jingsong Lee


--
From:like 
Send Time:2020年3月2日(星期一) 14:58
To:user-zh@flink.apache.org 
Subject:使用Flink1.10.0读取hive时source并行度问题

hi,大家好

我使用flink1.10.0读取hive表,启动时设置了全局的并行度,但是在读取的时候,发现sink并行度是受约束的,
而source的并行度不受此约束,会根据source的大小改变,大的时候并行度大到1000,请问一下怎么处理这个并行度呢?

Re: Flink remote batch execution in dynamic cluster

2020-03-01 Thread Antonio Martínez Carratalá
Thank you Piotrek, I will check those options, I only have a standalone
cluster so any option would need a set up.

On Fri, Feb 28, 2020 at 2:12 PM Piotr Nowojski  wrote:

> Hi,
>
> I guess it depends what do you have already available in your cluster and
> try to use that. Running Flink in existing Yarn cluster is very easy, but
> setting up yarn cluster in the first place even if it’s easy (I’m not sure
> about if that’s the case), would add extra complexity.
>
> When I’m spawning an AWS cluster for testing, I’m using EMR with Yarn
> included and I think that’s very easy to do, as everything works out of the
> box. I’ve heard that Kubernetes/Docker are just as easy. I’m also not a dev
> ops, but I’ve heard that my colleagues, if have any preferences, they
> usually prefer Kubernetes.
>
> Have in mind that I need to run the job with
> ExecutionEnvironment.createRemoteEnvironment(), to upload a jar is not a
> valid option for me, it seems to me that not all the options support remote
> submission of jobs, but I'm not sure
>
>
> I think all of them support should support remote environment. Almost for
> sure Standalone, Yarn, Kubernetes and Docker do.
>
> Piotrek
>
> On 28 Feb 2020, at 10:25, Antonio Martínez Carratalá <
> amarti...@alto-analytics.com> wrote:
>
> Hello
>
> I'm working on a project with Flink 1.8. I'm running my code from Java in
> a remote Flink as described here
> https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/cluster_execution.html
> . That part is working, but I want to configure a dynamic Flink cluster to
> execute the jobs
>
> Imagine I have users that sometimes need to run a report, this report is
> generated with data processed in Flink, whenever a user requests a report I
> have to submit a job to a remote Flink cluster, this job execution is heavy
> and may require 1 hour to finish
>
> So, I don't want to have 3, 4, 5... Task Managers always running in the
> cluster, some times they are idle and other times I don't have enough Task
> Managers for all the requests, I want to dynamically create Task Managers
> as the jobs are received at the Job Manager, and get rid of them at the end
>
> I see a lot of options to create a cluster in
> https://ci.apache.org/projects/flink/flink-docs-release-1.8/ section
> [Deployment & Operations] [Clusters & Deployment] like Standalone, YARN,
> Mesos, Docker, Kubernetes... but I don't know what would be the most
> suitable for my case of use, I'm not an expert in devops and I barely know
> about these technologies
>
> Some advice on which technology to use, and maybe some examples, would be
> really appreciated
>
> Have in mind that I need to run the job with
> ExecutionEnvironment.createRemoteEnvironment(), to upload a jar is not a
> valid option for me, it seems to me that not all the options support remote
> submission of jobs, but I'm not sure
>
> Thank you
>
> Antonio Martinez
>
>
>

-- 

--

*Alto Social Analytics, S.L., tratará tus datos con la finalidad de
mantener la relación contractual, gestionar tu solicitud, así como enviarte
comunicaciones comerciales relacionadas con su ámbito de actividad y sus
servicios. Puedes oponerte a este tratamiento, así como ejercitar el resto
de derechos de acceso, rectificación o supresión, limitación de su
tratamiento, portabilidad, en nuestro domicilio social y en el correo
electrónico: d...@alto-analytics.com . Más
información en www.alto-analytics.com . La
información contenida en este correo es confidencial y para uso exclusivo
de la persona que la reciba. Si no eres la persona correcta o has recibido
esta comunicación por error, te rogamos que nos lo notifiques y lo
elimines, dado que puede contener información sujeta a secreto empresarial
o propiedad intelectual de terceros.*


*Alto Social Analytics, S.L., will process your data for the purpose of
maintaining the contractual relationship, managing your request, as well as
sending you commercial communications related to its field of activity and
services. You can oppose this processing, as well as exercise the rest of
rights of access, rectification or deletion, limitation of processing,
portability, in our registered office and in our
email: d...@alto-analytics.com . More information
at www.alto-analytics.com . The information
contained in this email is confidential and for the exclusive use of the
person who receives it. If you have received this communication by mistake,
we ask you to notify us and delete it, since it may contain information
subject to business secrecy or intellectual property of third parties.*


Re: How JobManager and TaskManager find each other?

2020-03-01 Thread KristoffSC
Thanks about clarification for NAT,

Moving NAT issue aside for a moment",

Is the process of sending "task deployment descriptor" that you mentioned in
"Feb 26, 2020; 4:18pm" a specially the process of notifying TaskManager
about IP of participating TaskManagers in job described somewhere? I'm
familiar with [1] [2] but in there there is no information about sending the
IP information of Task managers.


Another question is how this all sums for Kubernetes Job Session Cluster
deployment when nodes will be deployed across many physical machines inside
Kubernetes cluster.
If I'm using Kubernetes like described in [3]

The final question would be, do I have to modify jobmanager.rpc.address and
flink/conf/slaves file when running Docker JobCluster on Kubernetes. The
default values are localhost. 
Or just following [3] will be fine?

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/concepts/runtime.html
[2]
https://ci.apache.org/projects/flink/flink-docs-stable/internals/job_scheduling.html
[3]
https://github.com/apache/flink/tree/release-1.10/flink-container/kubernetes



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Question about runtime filter

2020-03-01 Thread JingsongLee
Hi,

Does runtime filter probe side wait for building runtime filter?
Can you check the start time of build side and probe side? 

Best,
Jingsong Lee


--
From:faaron zheng 
Send Time:2020年3月2日(星期一) 14:55
To:user 
Subject:Question about runtime filter

Hi, everyone

These days, I am trying to implement runtime filter in flink1.10 with 
flink-sql-benchmark  according to blink. I mainly change three part of flink 
code: add runtime filter rule; modify the code gen and bloomfilter; add some 
aggregatedaccumulator  methods according to accumulator. Now, It seems runtime 
filter works in execution graph as follows:
Source: HiveTableSource(i_item_sk, i_item_id, i_rec_start_date, i_rec_end_date, 
i_item_desc, i_current_price, i_wholesale_cost, i_brand_id, i_brand, 
i_class_id, i_class, i_category_id, i_category, i_manufact_id, i_manufact, 
i_size, i_formulation, i_color, i_units, i_container, i_manager_id, 
i_product_name) TablePath: tpcds_bin_orc_2.item, PartitionPruned: false, 
PartitionNums: null, ProjectedFields: [0, 10, 12] -> Calc(select=[i_item_sk], 
where=[((i_category = _UTF-16LE'Jewelry':VARCHAR(2147483647) CHARACTER SET 
"UTF-16LE") AND (i_class = _UTF-16LE'consignment':VARCHAR(2147483647) CHARACTER 
SET "UTF-16LE") AND RUNTIME_FILTER_BUILDER_0(i_item_sk))])  

and

Source: HiveTableSource(d_date_sk, d_date_id, d_date, d_month_seq, d_week_seq, 
d_quarter_seq, d_year, d_dow, d_moy, d_dom, d_qoy, d_fy_year, d_fy_quarter_seq, 
d_fy_week_seq, d_day_name, d_quarter_name, d_holiday, d_weekend, 
d_following_holiday, d_first_dom, d_last_dom, d_same_day_ly, d_same_day_lq, 
d_current_day, d_current_week, d_current_month, d_current_quarter, 
d_current_year) TablePath: tpcds_bin_orc_2.date_dim, PartitionPruned: false, 
PartitionNums: null, ProjectedFields: [0, 3] -> Calc(select=[d_date_sk, 
d_month_seq], where=[RUNTIME_FILTER_2(d_date_sk)])  


However,the number of records sent is the same as normal.  Anyone who can give 
me some advices?



Thanks 

Re: 使用Flink1.10.0读取hive时source并行度问题

2020-03-01 Thread JingsongLee
Hi, 

1.10中,Hive source是自动推断并发的,你可以使用以下参数配置到flink-conf.yaml里面来控制并发:
- table.exec.hive.infer-source-parallelism=true (默认使用自动推断)
- table.exec.hive.infer-source-parallelism.max=1000 (自动推断的最大并发)

Sink的并发默认和上游的并发相同,如果有Shuffle,使用配置的统一并发。

Best,
Jingsong Lee


--
From:like 
Send Time:2020年3月2日(星期一) 14:58
To:user-zh@flink.apache.org 
Subject:使用Flink1.10.0读取hive时source并行度问题

hi,大家好

  我使用flink1.10.0读取hive表,启动时设置了全局的并行度,但是在读取的时候,发现sink并行度是受约束的,
而source的并行度不受此约束,会根据source的大小改变,大的时候并行度大到1000,请问一下怎么处理这个并行度呢?

使用Flink1.10.0读取hive时source并行度问题

2020-03-01 Thread like
hi,大家好

  我使用flink1.10.0读取hive表,启动时设置了全局的并行度,但是在读取的时候,发现sink并行度是受约束的,
而source的并行度不受此约束,会根据source的大小改变,大的时候并行度大到1000,请问一下怎么处理这个并行度呢?

Question about runtime filter

2020-03-01 Thread faaron zheng
Hi, everyone

These days, I am trying to implement runtime filter in flink1.10 with
flink-sql-benchmark  according to blink. I mainly change three part of
flink code: add runtime filter rule; modify the code gen and bloomfilter;
add some aggregatedaccumulator  methods according to accumulator. Now, It
seems runtime filter works in execution graph as follows:
Source: HiveTableSource(i_item_sk, i_item_id, i_rec_start_date,
i_rec_end_date, i_item_desc, i_current_price, i_wholesale_cost, i_brand_id,
i_brand, i_class_id, i_class, i_category_id, i_category, i_manufact_id,
i_manufact, i_size, i_formulation, i_color, i_units, i_container,
i_manager_id, i_product_name) TablePath: tpcds_bin_orc_2.item,
PartitionPruned: false, PartitionNums: null, ProjectedFields: [0, 10, 12]
-> Calc(select=[i_item_sk], where=[((i_category =
_UTF-16LE'Jewelry':VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AND
(i_class = _UTF-16LE'consignment':VARCHAR(2147483647) CHARACTER SET
"UTF-16LE") AND RUNTIME_FILTER_BUILDER_0(i_item_sk))])

and

Source: HiveTableSource(d_date_sk, d_date_id, d_date, d_month_seq,
d_week_seq, d_quarter_seq, d_year, d_dow, d_moy, d_dom, d_qoy, d_fy_year,
d_fy_quarter_seq, d_fy_week_seq, d_day_name, d_quarter_name, d_holiday,
d_weekend, d_following_holiday, d_first_dom, d_last_dom, d_same_day_ly,
d_same_day_lq, d_current_day, d_current_week, d_current_month,
d_current_quarter, d_current_year) TablePath: tpcds_bin_orc_2.date_dim,
PartitionPruned: false, PartitionNums: null, ProjectedFields: [0, 3] ->
Calc(select=[d_date_sk, d_month_seq], where=[RUNTIME_FILTER_2(d_date_sk)])


However,the number of records sent is the same as normal.  Anyone who can
give me some advices?



Thanks


Re: Get Tumbling Window Top-K using SQL

2020-03-01 Thread Jark Wu
Hi Weizheng,

You are right. You can use the TopN feature in blink planner. But note that
it doesn't support tumbling window topn, it is a topn without windowing and
event-time.
But you can achieve it by PARTITIONED BY , the 
column could be a preprocessed column which represents which window does
this row belongs to, e.g. 1-hour windowing: "2020-03-02 10:00", "2020-03-02
11:00".

The tumbling window topn will be natively supported in the future.

Best,
Jark

On Mon, 2 Mar 2020 at 10:55, Lu Weizheng  wrote:

> Sorry guys,
>
> I find solution on wiki about Top-N using Blink planner.
>
> SELECT [column_list]FROM (
>SELECT [column_list],
>  ROW_NUMBER() OVER ([PARTITION BY col1[, col2...]]
>ORDER BY col1 [asc|desc][, col2 [asc|desc]...]) AS rownum
>FROM table_name)WHERE rownum <= N [AND conditions]
>
>
> thanks anyway.
> --
> *发件人:* Lu Weizheng 
> *发送时间:* 2020年3月1日 17:48
> *收件人:* user@flink.apache.org 
> *主题:* Get Tumbling Window Top-K using SQL
>
> Hi,
>
> I find a question on StackOverflow(
> https://stackoverflow.com/questions/49191326/flink-stream-sql-order-by)
> about how to get Top-K using Flink SQL, it was written by Fabian. It was
> backed in 2018.
> The main idea is using a RANK to get the Top K of filed 'a':
>
> SELECT a, b, c
> FROM (
>   SELECT
> a, b, c,
> RANK() OVER (ORDER BY a PARTITION BY CEIL(t TO MINUTE) BETWEEN UNBOUNDED 
> PRECEDING AND CURRENT ROW) as rank
>   FROM yourTable)
> WHERE rank <= 10
>
> is there better way to get tumbling window Top-K item now?
>
> And the wiki on dynamic table may need to update.
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/streaming/dynamic_tables.html
>
> In the above wiki, I don't know why the query has a field 'lastLogin'
>
> SELECT user, RANK() OVER (ORDER BY lastLogin)FROM (
>   SELECT user, MAX(cTime) AS lastAction FROM clicks GROUP BY user);
>
>
> Thanks!
>


Re: Is CSV format supported for Kafka in Flink 1.10?

2020-03-01 Thread Jark Wu
Hi Kant,

Csv is supported in Kafka, but you should download and load flink-csv sql
jar into SQL CLI using `--library`.
Because, the Csv format factory is implemented in a separate module and not
bundled by default.

[1]:
https://repo1.maven.org/maven2/org/apache/flink/flink-csv/1.10.0/flink-csv-1.10.0-sql-jar.jar

On Sun, 1 Mar 2020 at 03:48, kant kodali  wrote:

> Hi,
>
> Is CSV format supported for Kafka in Flink 1.10? It says I need to specify
> connector.type as Filesystem but documentation says it is supported for
> Kafka?
>
> import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
> import org.apache.flink.runtime.state.StateBackend;
> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import org.apache.flink.table.api.DataTypes;
> import org.apache.flink.table.api.EnvironmentSettings;
> import org.apache.flink.table.api.Table;
> import org.apache.flink.table.api.java.StreamTableEnvironment;
> import org.apache.flink.table.descriptors.Csv;
> import org.apache.flink.table.descriptors.Kafka;
> import org.apache.flink.table.descriptors.Schema;
> import org.apache.flink.types.Row;
>
> public class Test {
>
> public static void main(String... args) throws Exception {
> EnvironmentSettings settings = EnvironmentSettings.newInstance()
> .useBlinkPlanner()
> .inStreamingMode()
> .build();
>
> StreamExecutionEnvironment streamExecutionEnvironment = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> streamExecutionEnvironment.setStateBackend((StateBackend) new 
> RocksDBStateBackend("file:///tmp/rocksdb"));
> StreamTableEnvironment tableEnvironment = 
> StreamTableEnvironment.create(streamExecutionEnvironment, settings);
>
> tableEnvironment
> .connect(
> new Kafka()
> .version("0.11")
> .topic("test-topic1")
> )
> .withFormat(new Csv())
> .withSchema(new Schema().field("f0", DataTypes.STRING()))
> .inAppendMode()
> .createTemporaryTable("kafka_source");
>
> Table resultTable = tableEnvironment.sqlQuery("select * from 
> kafka_source");
> tableEnvironment.toAppendStream(resultTable, Row.class).print();
>
> tableEnvironment.execute("Sample Job");
> }
> }
>
>
> This code generates the following error
>
> Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException:
> Could not find a suitable table factory for
> 'org.apache.flink.table.factories.TableSourceFactory' in
> the classpath.
>
> Reason: Required context properties mismatch.
>
> The matching candidates:
> org.apache.flink.table.sources.CsvAppendTableSourceFactory
> Mismatched properties:
> 'connector.type' expects 'filesystem', but is 'kafka'
>
> The following properties are requested:
> connector.property-version=1
> connector.topic=test-topic1
> connector.type=kafka
> connector.version=0.11
> format.property-version=1
> format.type=csv
> schema.0.data-type=VARCHAR(2147483647)
> schema.0.name=f0
> update-mode=append
>
> The following factories have been considered:
> org.apache.flink.table.sources.CsvBatchTableSourceFactory
> org.apache.flink.table.sources.CsvAppendTableSourceFactory
> at
> org.apache.flink.table.factories.TableFactoryService.filterByContext(TableFactoryService.java:322)
> at
> org.apache.flink.table.factories.TableFactoryService.filter(TableFactoryService.java:190)
> at
> org.apache.flink.table.factories.TableFactoryService.findSingleInternal(TableFactoryService.java:143)
> at
> org.apache.flink.table.factories.TableFactoryService.find(TableFactoryService.java:96)
> at
> org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:52)
> ... 34 more
>
>
>


Re: Hive Source With Kerberos认证问题

2020-03-01 Thread Rui Li
从你贴的log来看似乎是创建了embedded metastore。可以检查一下HiveCatalog是不是读到了不正确的hive
conf?另外你贴的maven的这些依赖都打到你flink作业的jar里了么?像datanucleus的依赖应该是不需要的。

On Sat, Feb 29, 2020 at 10:42 PM 叶贤勋  wrote:

> Hi 李锐,感谢你的回复。
> 前面的问题通过设置yarn.resourcemanager.principal,已经解决。
> 但是现在出现另外一个问题,请帮忙看看。
>
> 背景:flink任务还是source带有kerberos的hive,相同代码在本地进行测试是能通过kerberos认证,并且能够查询和插入数据到hive。但是任务提交到集群就报kerberos认证失败的错误。
> Flink:1.9.1, flink-1.9.1/lib/有flink-dist_2.11-1.9.1.jar,
> flink-shaded-hadoop-2-uber-2.7.5-7.0.jar,log4j-1.2.17.jar,
> slf4j-log4j12-1.7.15.jar
> Hive:2.1.1
> flink任务主要依赖的jar:
> [INFO] +- org.apache.flink:flink-table-api-java:jar:flink-1.9.1:compile
> [INFO] |  +- org.apache.flink:flink-table-common:jar:flink-1.9.1:compile
> [INFO] |  |  \- org.apache.flink:flink-core:jar:flink-1.9.1:compile
> [INFO] |  | +-
> org.apache.flink:flink-annotations:jar:flink-1.9.1:compile
> [INFO] |  | +-
> org.apache.flink:flink-metrics-core:jar:flink-1.9.1:compile
> [INFO] |  | \- com.esotericsoftware.kryo:kryo:jar:2.24.0:compile
> [INFO] |  |+- com.esotericsoftware.minlog:minlog:jar:1.2:compile
> [INFO] |  |\- org.objenesis:objenesis:jar:2.1:compile
> [INFO] |  +- com.google.code.findbugs:jsr305:jar:1.3.9:compile
> [INFO] |  \- org.apache.flink:force-shading:jar:1.9.1:compile
> [INFO] +-
> org.apache.flink:flink-table-planner-blink_2.11:jar:flink-1.9.1:compile
> [INFO] |  +-
> org.apache.flink:flink-table-api-scala_2.11:jar:flink-1.9.1:compile
> [INFO] |  |  +- org.scala-lang:scala-reflect:jar:2.11.12:compile
> [INFO] |  |  \- org.scala-lang:scala-compiler:jar:2.11.12:compile
> [INFO] |  +-
> org.apache.flink:flink-table-api-java-bridge_2.11:jar:flink-1.9.1:compile
> [INFO] |  |  +- org.apache.flink:flink-java:jar:flink-1.9.1:compile
> [INFO] |  |  \-
> org.apache.flink:flink-streaming-java_2.11:jar:1.9.1:compile
> [INFO] |  +-
> org.apache.flink:flink-table-api-scala-bridge_2.11:jar:flink-1.9.1:compile
> [INFO] |  |  \- org.apache.flink:flink-scala_2.11:jar:flink-1.9.1:compile
> [INFO] |  +-
> org.apache.flink:flink-table-runtime-blink_2.11:jar:flink-1.9.1:compile
> [INFO] |  |  +- org.codehaus.janino:janino:jar:3.0.9:compile
> [INFO] |  |  \- org.apache.calcite.avatica:avatica-core:jar:1.15.0:compile
> [INFO] |  \- org.reflections:reflections:jar:0.9.10:compile
> [INFO] +- org.apache.flink:flink-table-planner_2.11:jar:flink-1.9.1:compile
> [INFO] +- org.apache.commons:commons-lang3:jar:3.9:compile
> [INFO] +- com.typesafe.akka:akka-actor_2.11:jar:2.5.21:compile
> [INFO] |  +- org.scala-lang:scala-library:jar:2.11.8:compile
> [INFO] |  +- com.typesafe:config:jar:1.3.3:compile
> [INFO] |  \-
> org.scala-lang.modules:scala-java8-compat_2.11:jar:0.7.0:compile
> [INFO] +- org.apache.flink:flink-sql-client_2.11:jar:1.9.1:compile
> [INFO] |  +- org.apache.flink:flink-clients_2.11:jar:1.9.1:compile
> [INFO] |  |  \- org.apache.flink:flink-optimizer_2.11:jar:1.9.1:compile
> [INFO] |  +- org.apache.flink:flink-streaming-scala_2.11:jar:1.9.1:compile
> [INFO] |  +- log4j:log4j:jar:1.2.17:compile
> [INFO] |  \- org.apache.flink:flink-shaded-jackson:jar:2.9.8-7.0:compile
> [INFO] +- org.apache.flink:flink-json:jar:1.9.1:compile
> [INFO] +- org.apache.flink:flink-csv:jar:1.9.1:compile
> [INFO] +- org.apache.flink:flink-hbase_2.11:jar:1.9.1:compile
> [INFO] +- org.apache.hbase:hbase-server:jar:2.2.1:compile
> [INFO] |  +-
> org.apache.hbase.thirdparty:hbase-shaded-protobuf:jar:2.2.1:compile
> [INFO] |  +-
> org.apache.hbase.thirdparty:hbase-shaded-netty:jar:2.2.1:compile
> [INFO] |  +-
> org.apache.hbase.thirdparty:hbase-shaded-miscellaneous:jar:2.2.1:compile
> [INFO] |  |  \-
> com.google.errorprone:error_prone_annotations:jar:2.3.3:compile
> [INFO] |  +- org.apache.hbase:hbase-common:jar:2.2.1:compile
> [INFO] |  |  \-
> com.github.stephenc.findbugs:findbugs-annotations:jar:1.3.9-1:compile
> [INFO] |  +- org.apache.hbase:hbase-http:jar:2.2.1:compile
> [INFO] |  |  +- org.eclipse.jetty:jetty-util:jar:9.3.27.v20190418:compile
> [INFO] |  |  +-
> org.eclipse.jetty:jetty-util-ajax:jar:9.3.27.v20190418:compile
> [INFO] |  |  +- org.eclipse.jetty:jetty-http:jar:9.3.27.v20190418:compile
> [INFO] |  |  +-
> org.eclipse.jetty:jetty-security:jar:9.3.27.v20190418:compile
> [INFO] |  |  +- org.glassfish.jersey.core:jersey-server:jar:2.25.1:compile
> [INFO] |  |  |  +-
> org.glassfish.jersey.core:jersey-common:jar:2.25.1:compile
> [INFO] |  |  |  |  +-
> org.glassfish.jersey.bundles.repackaged:jersey-guava:jar:2.25.1:compile
> [INFO] |  |  |  |  \-
> org.glassfish.hk2:osgi-resource-locator:jar:1.0.1:compile
> [INFO] |  |  |  +-
> org.glassfish.jersey.core:jersey-client:jar:2.25.1:compile
> [INFO] |  |  |  +-
> org.glassfish.jersey.media:jersey-media-jaxb:jar:2.25.1:compile
> [INFO] |  |  |  +- javax.annotation:javax.annotation-api:jar:1.2:compile
> [INFO] |  |  |  +- org.glassfish.hk2:hk2-api:jar:2.5.0-b32:compile
> [INFO] |  |  |  |  +- org.glassfish.hk2:hk2-utils:jar:2.5.0-b32:compile
> [INFO] |  |  |  |  \-
> 

Re:Re: Re: Re: Re: Flink SQL job failed to submit with enableCheckpointing while SQL contains UDF

2020-03-01 Thread sunfulin







create table lscsp_sc_order_all (
  amount varchar  ,
  argType varchar,
  balance varchar,
  branchNo varchar  ,
  businessType varchar ,
  channelType varchar ,
  counterOrderNo varchar  ,
  counterRegisteredDate varchar,
  custAsset varchar  ,
  customerNumber varchar,
  customerType varchar,
  discountId varchar,
  doubleRecordFlag varchar,
  doubleRecordType varchar,
  exceedFlag varchar,
  fundAccount varchar,
  fundCode varchar,
  fundCompany varchar,
  fundName varchar,
  fundRecruitmentFlag varchar,
  id varchar,
  lastUpdateTime varchar,
  opBranchNo varchar,
  opStation varchar,
  orderNo varchar,
  orgEntrustNo varchar,
  orgOrderNo varchar,
  prodId varchar,
  prodInvestorType varchar,
  prodLeafType varchar,
  prodRisk varchar,
  prodRiskFlag varchar,
  prodRootType varchar,
  prodTerm varchar,
  prodVariety varchar,
  quaInvestorFlag varchar,
  quaInvestorSource varchar,
  quickPurchaseFlag varchar,
  remark varchar,
  remark1 varchar,
  remark2 varchar,
  remark3 varchar,
  riskFlag varchar,
  scRcvTime varchar,
  scSendTime varchar,
  signId varchar,
  signSpecialRiskFlag varchar,
  source varchar,
  status varchar,
  subRiskFlag varchar,
  sysNodeId varchar,
  taSerialNo varchar,
  termFlag varchar,
  token varchar,
  tradeConfirmDate varchar,
  transFundCode varchar,
  transProdId varchar,
  varietyFlag varchar,
  zlcftProdType varchar,
  proctime as PROCTIME()   -- 通过计算列产生一个处理时间列
)
with
(
  'connector.type' = 'kafka',  -- 使用 kafka connector
  'connector.version' = '0.10',  -- kafka 版本,universal 支持 0.11 以上的版本
  'connector.topic' = '',  -- kafka topic
  'connector.startup-mode' = 'group-offsets',  -- 从起始 offset 开始读取
  'connector.properties.zookeeper.connect' = '',  -- zookeeper 地址
  'connector.properties.bootstrap.servers' = '',  -- kafka 
broker 地址
  'connector.properties.group.id' = 'acrm-realtime-saleorder-consumer-1',
  'format.type' = 'json'  -- 数据源格式为 json
)







CREATE TABLE dim_app_cust_info (
cust_id varchar ,
open_comp_name varchar ,
open_comp_id varchar ,
org_name varchar ,
org_id varchar,
comp_name varchar ,
comp_id varchar ,
mng_name varchar ,
mng_id varchar ,
is_tg varchar ,
cust_name varchar ,
cust_type varchar,
avg_tot_aset_y365 double ,
avg_aset_create_y double
) WITH (
'connector.type' = 'jdbc',
'connector.url' = '',
'connector.table' = 'app_cust_serv_rel_info',
'connector.driver' = 'com.mysql.jdbc.Driver',
'connector.username' = 'admin',
'connector.password' = 'Windows7',
'connector.lookup.cache.max-rows' = '8000',
'connector.lookup.cache.ttl' = '30min',
'connector.lookup.max-retries' = '3'
)








At 2020-03-02 09:16:05, "Benchao Li"  wrote:
>Could you also provide us the DDL for lscsp_sc_order_all
>and dim_app_cust_info ?
>
>sunfulin  于2020年3月1日周日 下午9:22写道:
>
>>
>> *CREATE TABLE **realtime_product_sell *(
>>   sor_pty_id *varchar*,
>>   entrust_date *varchar*,
>>   entrust_time *varchar*,
>>   product_code *varchar *,
>>   business_type *varchar *,
>>   balance *double *,
>>   cust_name *varchar *,
>>   open_comp_name *varchar *,
>>   open_comp_id *varchar *,
>>   org_name *varchar *,
>>   org_id *varchar *,
>>   comp_name *varchar *,
>>   comp_id *varchar *,
>>   mng_name *varchar *,
>>   mng_id *varchar *,
>>   is_tg *varchar *,
>>   cust_type *varchar *,
>>   avg_tot_aset_y365 *double *,
>>   avg_aset_create_y
>> *double*) *WITH *(
>> *'connector.type' *= *'elasticsearch'*,
>> *'connector.version' *= *''*,
>> *'connector.hosts' *= *''*,
>> *'connector.index' *= *'realtime_product_sell_007118'*,
>> *'connector.document-type' *= *'_doc'*,
>> *'update-mode' *= *'upsert'*,
>> *'connector.key-delimiter' *= *'$'*,
>> *'connector.key-null-literal' *= *'n/a'*,
>> *'connector.bulk-flush.interval' *= *'1000'*,
>> *'format.type' *=
>> *'json'*)
>>
>>
>>
>>
>>
>> At 2020-03-01 21:08:08, "Benchao Li"  wrote:
>> >The UDF looks good. Could you also paste your DDL? Then we can produce your
>> >bug easily.
>> >
>> >sunfulin  于2020年3月1日周日 下午6:39写道:
>> >
>> >> Below is the code. The function trans origin field timeStr "2020-03-01
>> >> 12:01:00.234" to target timeStr accroding to dayTag.
>> >>
>> >> *public class *ts2Date *extends *ScalarFunction {
>> >> *public *ts2Date() {
>> >>
>> >> }
>> >>
>> >>
>> >> *public *String eval (String timeStr, *boolean *dayTag) {
>> >>
>> >> *if*(timeStr == *null*) {
>> >> *return null*;
>> >> }
>> >> SimpleDateFormat ortSf = *new *SimpleDateFormat(*"-MM-dd
>> >> HH:mm:ss.SSS"*);
>> >> Date date = *new *Date();
>> >> *try *{
>> >> date = ortSf.parse(timeStr);
>> >> } *catch *(ParseException e) {
>> >> e.printStackTrace();
>> >> *return null*;
>> >> }
>> >> *if *(dayTag) {
>> >> String format = *"-MM-dd"*;
>> >> SimpleDateFormat sf = *new *SimpleDateFormat(format);
>> >> *return *sf.format(date);
>> >> } *else *{
>> >> String format = 

Re:Re: Re: Re: Re: Flink SQL job failed to submit with enableCheckpointing while SQL contains UDF

2020-03-01 Thread sunfulin







create table lscsp_sc_order_all (
  amount varchar  ,
  argType varchar,
  balance varchar,
  branchNo varchar  ,
  businessType varchar ,
  channelType varchar ,
  counterOrderNo varchar  ,
  counterRegisteredDate varchar,
  custAsset varchar  ,
  customerNumber varchar,
  customerType varchar,
  discountId varchar,
  doubleRecordFlag varchar,
  doubleRecordType varchar,
  exceedFlag varchar,
  fundAccount varchar,
  fundCode varchar,
  fundCompany varchar,
  fundName varchar,
  fundRecruitmentFlag varchar,
  id varchar,
  lastUpdateTime varchar,
  opBranchNo varchar,
  opStation varchar,
  orderNo varchar,
  orgEntrustNo varchar,
  orgOrderNo varchar,
  prodId varchar,
  prodInvestorType varchar,
  prodLeafType varchar,
  prodRisk varchar,
  prodRiskFlag varchar,
  prodRootType varchar,
  prodTerm varchar,
  prodVariety varchar,
  quaInvestorFlag varchar,
  quaInvestorSource varchar,
  quickPurchaseFlag varchar,
  remark varchar,
  remark1 varchar,
  remark2 varchar,
  remark3 varchar,
  riskFlag varchar,
  scRcvTime varchar,
  scSendTime varchar,
  signId varchar,
  signSpecialRiskFlag varchar,
  source varchar,
  status varchar,
  subRiskFlag varchar,
  sysNodeId varchar,
  taSerialNo varchar,
  termFlag varchar,
  token varchar,
  tradeConfirmDate varchar,
  transFundCode varchar,
  transProdId varchar,
  varietyFlag varchar,
  zlcftProdType varchar,
  proctime as PROCTIME()   -- 通过计算列产生一个处理时间列
)
with
(
  'connector.type' = 'kafka',  -- 使用 kafka connector
  'connector.version' = '0.10',  -- kafka 版本,universal 支持 0.11 以上的版本
  'connector.topic' = '',  -- kafka topic
  'connector.startup-mode' = 'group-offsets',  -- 从起始 offset 开始读取
  'connector.properties.zookeeper.connect' = '',  -- zookeeper 地址
  'connector.properties.bootstrap.servers' = '',  -- kafka 
broker 地址
  'connector.properties.group.id' = 'acrm-realtime-saleorder-consumer-1',
  'format.type' = 'json'  -- 数据源格式为 json
)







CREATE TABLE dim_app_cust_info (
cust_id varchar ,
open_comp_name varchar ,
open_comp_id varchar ,
org_name varchar ,
org_id varchar,
comp_name varchar ,
comp_id varchar ,
mng_name varchar ,
mng_id varchar ,
is_tg varchar ,
cust_name varchar ,
cust_type varchar,
avg_tot_aset_y365 double ,
avg_aset_create_y double
) WITH (
'connector.type' = 'jdbc',
'connector.url' = '',
'connector.table' = 'app_cust_serv_rel_info',
'connector.driver' = 'com.mysql.jdbc.Driver',
'connector.username' = 'admin',
'connector.password' = 'Windows7',
'connector.lookup.cache.max-rows' = '8000',
'connector.lookup.cache.ttl' = '30min',
'connector.lookup.max-retries' = '3'
)








At 2020-03-02 09:16:05, "Benchao Li"  wrote:
>Could you also provide us the DDL for lscsp_sc_order_all
>and dim_app_cust_info ?
>
>sunfulin  于2020年3月1日周日 下午9:22写道:
>
>>
>> *CREATE TABLE **realtime_product_sell *(
>>   sor_pty_id *varchar*,
>>   entrust_date *varchar*,
>>   entrust_time *varchar*,
>>   product_code *varchar *,
>>   business_type *varchar *,
>>   balance *double *,
>>   cust_name *varchar *,
>>   open_comp_name *varchar *,
>>   open_comp_id *varchar *,
>>   org_name *varchar *,
>>   org_id *varchar *,
>>   comp_name *varchar *,
>>   comp_id *varchar *,
>>   mng_name *varchar *,
>>   mng_id *varchar *,
>>   is_tg *varchar *,
>>   cust_type *varchar *,
>>   avg_tot_aset_y365 *double *,
>>   avg_aset_create_y
>> *double*) *WITH *(
>> *'connector.type' *= *'elasticsearch'*,
>> *'connector.version' *= *''*,
>> *'connector.hosts' *= *''*,
>> *'connector.index' *= *'realtime_product_sell_007118'*,
>> *'connector.document-type' *= *'_doc'*,
>> *'update-mode' *= *'upsert'*,
>> *'connector.key-delimiter' *= *'$'*,
>> *'connector.key-null-literal' *= *'n/a'*,
>> *'connector.bulk-flush.interval' *= *'1000'*,
>> *'format.type' *=
>> *'json'*)
>>
>>
>>
>>
>>
>> At 2020-03-01 21:08:08, "Benchao Li"  wrote:
>> >The UDF looks good. Could you also paste your DDL? Then we can produce your
>> >bug easily.
>> >
>> >sunfulin  于2020年3月1日周日 下午6:39写道:
>> >
>> >> Below is the code. The function trans origin field timeStr "2020-03-01
>> >> 12:01:00.234" to target timeStr accroding to dayTag.
>> >>
>> >> *public class *ts2Date *extends *ScalarFunction {
>> >> *public *ts2Date() {
>> >>
>> >> }
>> >>
>> >>
>> >> *public *String eval (String timeStr, *boolean *dayTag) {
>> >>
>> >> *if*(timeStr == *null*) {
>> >> *return null*;
>> >> }
>> >> SimpleDateFormat ortSf = *new *SimpleDateFormat(*"-MM-dd
>> >> HH:mm:ss.SSS"*);
>> >> Date date = *new *Date();
>> >> *try *{
>> >> date = ortSf.parse(timeStr);
>> >> } *catch *(ParseException e) {
>> >> e.printStackTrace();
>> >> *return null*;
>> >> }
>> >> *if *(dayTag) {
>> >> String format = *"-MM-dd"*;
>> >> SimpleDateFormat sf = *new *SimpleDateFormat(format);
>> >> *return *sf.format(date);
>> >> } *else *{
>> >> String format = 

Re: Providing hdfs name node IP for streaming file sink

2020-03-01 Thread Yang Wang
Hi Nick,

Certainly you could directly use "namenode:port" as the schema of you HDFS
path.
Then the hadoop configs(e.g. core-site.xml, hdfs-site.xml) will not be
necessary.
However, that also means you could benefit from the HDFS
high-availability[1].

If your HDFS cluster is HA configured, i strongly suggest you to set the
"HADOOP_CONF_DIR"
for your Flink application. Both the client and cluster(JM/TM) side need to
be set. Then
your HDFS path could be specified like this "hdfs://myhdfs/flink/test".
Given that "myhdfs"
is the name service configured in hdfs-site.xml.


Best,
Yang



[1].
http://hadoop.apache.org/docs/r2.8.5/hadoop-project-dist/hadoop-hdfs/HDFSHighAvailabilityWithQJM.html

Nick Bendtner  于2020年2月29日周六 上午6:00写道:

> To add to this question, do I need to setup env.hadoop.conf.dir to point
> to the hadoop config for instance env.hadoop.conf.dir=/etc/hadoop/ for
> the jvm ? Or is it possible to write to hdfs without any external hadoop
> config like core-site.xml, hdfs-site.xml ?
>
> Best,
> Nick.
>
>
>
> On Fri, Feb 28, 2020 at 12:56 PM Nick Bendtner  wrote:
>
>> Hi guys,
>> I am trying to write to hdfs from streaming file sink. Where should I
>> provide the IP address of the name node ? Can I provide it as a part of the
>> flink-config.yaml file or should I provide it like this :
>>
>> final StreamingFileSink sink = StreamingFileSink
>>  .forBulkFormat(hdfs://namenode:8020/flink/test, 
>> ParquetAvroWriters.forGenericRecord(schema))
>>
>>  .build();
>>
>>
>> Best,
>> Nick
>>
>>
>>


回复: Get Tumbling Window Top-K using SQL

2020-03-01 Thread Lu Weizheng
Sorry guys,

I find solution on wiki about Top-N using Blink planner.


SELECT [column_list]
FROM (
   SELECT [column_list],
 ROW_NUMBER() OVER ([PARTITION BY col1[, col2...]]
   ORDER BY col1 [asc|desc][, col2 [asc|desc]...]) AS rownum
   FROM table_name)
WHERE rownum <= N [AND conditions]

thanks anyway.

发件人: Lu Weizheng 
发送时间: 2020年3月1日 17:48
收件人: user@flink.apache.org 
主题: Get Tumbling Window Top-K using SQL

Hi,

I find a question on 
StackOverflow(https://stackoverflow.com/questions/49191326/flink-stream-sql-order-by)
 about how to get Top-K using Flink SQL, it was written by Fabian. It was 
backed in 2018.
The main idea is using a RANK to get the Top K of filed 'a':

SELECT a, b, c
FROM (
  SELECT
a, b, c,
RANK() OVER (ORDER BY a PARTITION BY CEIL(t TO MINUTE) BETWEEN UNBOUNDED 
PRECEDING AND CURRENT ROW) as rank
  FROM yourTable)
WHERE rank <= 10

is there better way to get tumbling window Top-K item now?

And the wiki on dynamic table may need to update. 
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/streaming/dynamic_tables.html

In the above wiki, I don't know why the query has a field 'lastLogin'


SELECT user, RANK() OVER (ORDER BY lastLogin)
FROM (
  SELECT user, MAX(cTime) AS lastAction FROM clicks GROUP BY user
);

Thanks!


Re: Timeout error in ZooKeeper

2020-03-01 Thread Yang Wang
Hi Samir.

It seems that your zookeeper connection timeout is set to 3000ms. And it
did not
connect to server for 14305ms, maybe due to full gc or network problem. When
it reconnected, the "ConnectionLossException" will be thrown.


So have you ever change the zookeeper client related timeout configurations
in Flink?
Or could you confirm the zookeeper server side timeout settings?


Best,
Yang

Samir Tusharbhai Chauhan 
于2020年3月1日周日 上午12:57写道:

> Hi @Till Rohrmann ,
>
>
>
> Thanks for the response. Unfortunately I could not capture much log on
> Flink side. I am still attaching whatever I could collect.
>
>
>
> I found this old ticket on same error. Not sure if this is related anyway.
>
> https://issues.apache.org/jira/browse/ZOOKEEPER-1582
>
>
>
> Somewhere I also read that it could be related to Znodes that ZNodes
> containing too much data or having too many children. By default ZooKeeper
> has a 1 MB transport limit.
>
>
>
> Warm Regards,
>
> *Samir Chauhan*
>
>
>
> *Regional Infrastructure & Operations*
>
>
>
> [image: cid:image002.png@01D12B8E.C23F3E10]
>
>
>
> *Prudential Services Singapore Pte Ltd *
>
> 1 Wallich Street #19-01, Guoco Tower Singapore 078881
>
>
>
> Direct (65) 6704 7264 Mobile (65) 9721 7548
>
> samir.tusharbhai.chau...@prudential.com.sg
>
>
>
> www.prudential.com.sg
>
>
>
> *From:* Till Rohrmann 
> *Sent:* Saturday, February 29, 2020 11:28 PM
> *To:* Samir Tusharbhai Chauhan  >
> *Cc:* user@flink.apache.org
> *Subject:* Re: Timeout error in ZooKeeper
>
>
>
> Hi Samir,
>
>
>
> it is hard to tell what exactly happened without the Flink logs. However,
> newer Flink versions include some ZooKeeper improvements and fixes for some
> bugs [1]. Hence, it might make sense to try to upgrade your Flink version.
>
>
>
> [1] https://issues.apache.org/jira/browse/FLINK-14091
> 
>
>
>
> Cheers,
>
> Till
>
>
>
> On Fri, Feb 28, 2020 at 7:41 PM Samir Tusharbhai Chauhan <
> samir.tusharbhai.chau...@prudential.com.sg> wrote:
>
> *Hi,*
>
>
>
> Yesterday morning I got below error in Zookeeper. After this error, my
> Flink did not connect to ZK and jobs went to hang state. I had to cancel
> and redeploy my all jobs to bring it to normal state.
>
> 2020-02-28 02:45:56,811 [myid:1] - WARN  [NIOServerCxn.Factory:
> 0.0.0.0/0.0.0.0:2181:NIOServerCnxn@368] - caught end of stream exception
> EndOfStreamException: Unable to read additional data from client sessionid
> 0x1701028573403f3, likely client has closed socket
> at
> org.apache.zookeeper.server.NIOServerCnxn.doIO(NIOServerCnxn.java:239)
> at
> org.apache.zookeeper.server.NIOServerCnxnFactory.run(NIOServerCnxnFactory.java:203)
> at java.lang.Thread.run(Thread.java:748)
>
> At the same time I saw below error in Flink.
>
> 2020-02-28 02:46:49,095 ERROR
> org.apache.curator.ConnectionState- Connection
> timed out for connection string (zk-cs:2181) and timeout (3000) / elapsed
> (14305)
>
> org.apache.curator.CuratorConnectionLossException: KeeperErrorCode =
> ConnectionLoss
>
>   at
> org.apache.curator.ConnectionState.checkTimeouts(ConnectionState.java:225)
>
>   at
> org.apache.curator.ConnectionState.getZooKeeper(ConnectionState.java:94)
>
>   at
> org.apache.curator.CuratorZookeeperClient.getZooKeeper(CuratorZookeeperClient.java:117)
>
>   at
> org.apache.curator.framework.imps.CuratorFrameworkImpl.performBackgroundOperation(CuratorFrameworkImpl.java:835)
>
>   at
> org.apache.curator.framework.imps.CuratorFrameworkImpl.backgroundOperationsLoop(CuratorFrameworkImpl.java:809)
>
>   at
> org.apache.curator.framework.imps.CuratorFrameworkImpl.access$300(CuratorFrameworkImpl.java:64)
>
>   at
> org.apache.curator.framework.imps.CuratorFrameworkImpl$4.call(CuratorFrameworkImpl.java:267)
>
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>
>   at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
>
>   at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>
>   at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>
>   at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>
>   at java.lang.Thread.run(Thread.java:748)
>
>
>
> Has anyone face similar error earlier.
>
>
>
> *My environment is*
>
> Azure Kubernetes 1.15.7
>
> Flink 1.6.0
>
> Zookeeper 3.4.10
>
>
>
> Warm Regards,
>
> *Samir Chauhan*
>
>
>
>
>
>
> There's a reason we support Fair Dealing. YOU.
>
>
> This email and any files transmitted with it or attached to it (the
> [Email]) may contain confidential, proprietary or legally privileged
> information and is intended solely for the use of the individual or entity
> to whom it is addressed. If you are not the intended 

Re: [Native Kubernetes] [Ceph S3] Unable to load credentials from service endpoint.

2020-03-01 Thread Yang Wang
Hi Niels,

You are right. The S3 related configurations you have set in your `main()`
is only
applicable in the client side. Since the filesystem is initialized in the
entrypoint of
JM/TM for only once. AFAIK, we could not provide different credentials for
each
job in the same session cluster.

Best,
Yang

Niels Basjes  于2020年2月28日周五 下午11:09写道:

> Hi,
>
> As I mentioned in my original email I already verified that the endpoints
> were accessible from the pods, that was not the problem.
>
> It took me a while but I've figured out what went wrong.
>
> Setting the configuration like I did
>
> final Configuration conf = new Configuration();
> conf.setString("presto.s3.endpoint", 
> "s3.example.nl");conf.setString("presto.s3.access-key",   
> "myAccessKey");conf.setString("presto.s3.secret-key",   
> "mySecretKey");FileSystem.initialize(conf, null);
>
> sets it in some static variables that do not get serialized and shipped
> into the task managers.
>
> As a consequence, under the absence of credentials the AWS/S3 client
> assumes it is running inside AWS and that it can retrieve the credentials
> from http://169.254.170.2  (which is non routable)
> Because this is not AWS it cannot do this and I get the error it cannot
> connect.
>
> For now my solution is to start the Flink Session with this
> #!/bin/bash
> ./flink-1.10.0/bin/kubernetes-session.sh \
>   -Dkubernetes.cluster-id=flink1100 \
>   -Dtaskmanager.memory.process.size=8192m \
>   -Dkubernetes.taskmanager.cpu=2 \
>   -Dtaskmanager.numberOfTaskSlots=4 \
>   -Dresourcemanager.taskmanager-timeout=360 \
>   -Dkubernetes.container.image=
> docker.example.nl/flink:1.10.0-2.12-s3-presto \
>   -Dpresto.s3.endpoint=s3.example.nl \
>   -Dpresto.s3.access-key=MyAccessKey \
>   -Dpresto.s3.secret-key=MySecretKey \
>   -Dpresto.s3.path.style.access=true
>
> I dislike this because now ALL jobs in this Flink cluster have the same
> credentials.
>
> Is there a way to set the S3 credentials on a per job or even per
> connection basis?
>
> Niels Basjes
>
>
> On Fri, Feb 28, 2020 at 4:38 AM Yang Wang  wrote:
>
>> Hi Niels,
>>
>> Glad to hear that you are trying Flink native K8s integration and share
>> you feedback.
>>
>> What is causing the differences in behavior between local and in k8s? It
>>> works locally but not in the cluster.
>>
>>
>> In your case, the job could be executed successfully local. That means S3
>> endpoint could be accessed in
>> your local network environment. When you submit the job to the K8s
>> cluster, the user `main()` will be executed
>> locally and get the job graph. Then it will be submitted to the cluster
>> for the execution. S3 endpoint will be
>> accessed under the K8s network. So maybe there is something wrong with
>> the network between taskmanager
>> and S3 endpoint.
>>
>> How do I figure out what network it is trying to reach in k8s?
>>
>>
>> I am not an expert of S3. So i am not sure whether the SDK will fetch the
>> credentials from S3 endpoint. If it is,
>> i think you need to find out which taskmanager the source operator is
>> running on. Then exec into the Pod and
>> use nslookup/curl to make sure the endpoint "s3.example.nl" could be
>> resolved and accessed successfully.
>>
>>
>>
>> Best,
>> Yang
>>
>>
>> Niels Basjes  于2020年2月28日周五 上午4:56写道:
>>
>>> Hi,
>>>
>>> I have a problem with accessing my own S3 system from within Flink when
>>> running on Kubernetes.
>>>
>>> *TL;DR* I have my own S3 (Ceph), Locally my application works, when
>>> running in K8s it fails with
>>>
>>> Caused by: com.amazonaws.SdkClientException: Unable to load credentials
>>> from service endpoint
>>> Caused by: java.net.SocketException: Network is unreachable (connect
>>> failed)
>>>
>>>
>>> I have my own Kubernetes cluster (1.17) on which I have install Ceph and
>>> the S3 gateway that is included in there.
>>> I have put a file on this 'S3' and in my Flink 1.10.0 application I do
>>> this:
>>>
>>> StreamExecutionEnvironment senv = 
>>> StreamExecutionEnvironment.getExecutionEnvironment();
>>>
>>> final Configuration conf = new Configuration();
>>>
>>> conf.setString("presto.s3.endpoint", "s3.example.nl");
>>>
>>> conf.setString("presto.s3.access-key",   "myAccessKey");
>>>
>>> conf.setString("presto.s3.secret-key",   "mySecretKey");
>>>
>>> FileSystem.initialize(conf, null);
>>>
>>> senv.setParallelism(2);
>>>
>>> senv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>>>
>>> DataStream rawInputStream = senv
>>>
>>> .readTextFile(path).name("Read input");
>>>
>>> ...
>>>
>>>
>>> The s3.example.nl is the hostname of the ingress I have attached to the
>>> S3 endpoint. In my case it is accessible via both http and https (with a
>>> valid LetsEncrypt certificate).
>>>
>>> When I run this locally from within IntelliJ it works like a charm,
>>> reads the data, does some stuff with it and then writes it to ElasticSearch.
>>>
>>> I have created an additional layer to enable the fs-s3-presto plugin
>>> with 

Re: Kubernetes Error Code

2020-03-01 Thread Yang Wang
Hi Samir,

I assume that you are running Flink standalone session/per-job cluster
on Kubernetes. Since both of them start the JM/TM process in the
foreground. So you could use `kubectl logs {pod_name}` to get the logs.
Only the exit code is not enough to find the root cause.


Best,
Yang

Samir Tusharbhai Chauhan 
于2020年3月1日周日 下午11:08写道:

>
>
>
>
> *Hi,*
>
> *Does anyone knows what is below error code? Our Flink pod got restarted
> and we see below when we do edit pod.*
>
> Warm Regards,
>
> *Samir Chauhan*
>
>
>
>
>
> There's a reason we support Fair Dealing. YOU.
>
>
> This email and any files transmitted with it or attached to it (the
> [Email]) may contain confidential, proprietary or legally privileged
> information and is intended solely for the use of the individual or entity
> to whom it is addressed. If you are not the intended recipient of the
> Email, you must not, directly or indirectly, copy, use, print, distribute,
> disclose to any other party or take any action in reliance on any part of
> the Email. Please notify the system manager or sender of the error and
> delete all copies of the Email immediately.
>
> No statement in the Email should be construed as investment advice being
> given within or outside Singapore. Prudential Assurance Company Singapore
> (Pte) Limited (PACS) and each of its related entities shall not be
> responsible for any losses, claims, penalties, costs or damages arising
> from or in connection with the use of the Email or the information therein,
> in whole or in part. You are solely responsible for conducting any virus
> checks prior to opening, accessing or disseminating the Email.
>
> PACS (Company Registration No. 199002477Z) is a company incorporated under
> the laws of Singapore and has its registered office at 30 Cecil Street,
> #30-01, Prudential Tower, Singapore 049712.
>
> PACS is an indirect wholly owned subsidiary of Prudential plc of the
> United Kingdom. PACS and Prudential plc are not affiliated in any manner
> with Prudential Financial, Inc., a company whose principal place of
> business is in the United States of America.
>


Re: Re: Re: Re: Flink SQL job failed to submit with enableCheckpointing while SQL contains UDF

2020-03-01 Thread Benchao Li
Could you also provide us the DDL for lscsp_sc_order_all
and dim_app_cust_info ?

sunfulin  于2020年3月1日周日 下午9:22写道:

>
> *CREATE TABLE **realtime_product_sell *(
>   sor_pty_id *varchar*,
>   entrust_date *varchar*,
>   entrust_time *varchar*,
>   product_code *varchar *,
>   business_type *varchar *,
>   balance *double *,
>   cust_name *varchar *,
>   open_comp_name *varchar *,
>   open_comp_id *varchar *,
>   org_name *varchar *,
>   org_id *varchar *,
>   comp_name *varchar *,
>   comp_id *varchar *,
>   mng_name *varchar *,
>   mng_id *varchar *,
>   is_tg *varchar *,
>   cust_type *varchar *,
>   avg_tot_aset_y365 *double *,
>   avg_aset_create_y
> *double*) *WITH *(
> *'connector.type' *= *'elasticsearch'*,
> *'connector.version' *= *''*,
> *'connector.hosts' *= *''*,
> *'connector.index' *= *'realtime_product_sell_007118'*,
> *'connector.document-type' *= *'_doc'*,
> *'update-mode' *= *'upsert'*,
> *'connector.key-delimiter' *= *'$'*,
> *'connector.key-null-literal' *= *'n/a'*,
> *'connector.bulk-flush.interval' *= *'1000'*,
> *'format.type' *=
> *'json'*)
>
>
>
>
>
> At 2020-03-01 21:08:08, "Benchao Li"  wrote:
> >The UDF looks good. Could you also paste your DDL? Then we can produce your
> >bug easily.
> >
> >sunfulin  于2020年3月1日周日 下午6:39写道:
> >
> >> Below is the code. The function trans origin field timeStr "2020-03-01
> >> 12:01:00.234" to target timeStr accroding to dayTag.
> >>
> >> *public class *ts2Date *extends *ScalarFunction {
> >> *public *ts2Date() {
> >>
> >> }
> >>
> >>
> >> *public *String eval (String timeStr, *boolean *dayTag) {
> >>
> >> *if*(timeStr == *null*) {
> >> *return null*;
> >> }
> >> SimpleDateFormat ortSf = *new *SimpleDateFormat(*"-MM-dd
> >> HH:mm:ss.SSS"*);
> >> Date date = *new *Date();
> >> *try *{
> >> date = ortSf.parse(timeStr);
> >> } *catch *(ParseException e) {
> >> e.printStackTrace();
> >> *return null*;
> >> }
> >> *if *(dayTag) {
> >> String format = *"-MM-dd"*;
> >> SimpleDateFormat sf = *new *SimpleDateFormat(format);
> >> *return *sf.format(date);
> >> } *else *{
> >> String format = *"-MM-dd**\'**T**\'**HH:mm:00.000+0800"*;
> >> SimpleDateFormat sf = *new *SimpleDateFormat(format);
> >> *return *sf.format(date);
> >> }
> >> }
> >> }
> >>
> >>
> >>
> >> At 2020-03-01 18:14:30, "Benchao Li"  wrote:
> >>
> >> Could you show how your UDF `ts2Date` is implemented?
> >>
> >> sunfulin  于2020年3月1日周日 下午6:05写道:
> >>
> >>> Hi, Benchao,
> >>> Thanks for the reply.
> >>>
> >>> Could you provide us more information?
> >>> 1. what planner are you using? blink or legacy planner?
> >>> I am using Blink Planner. Not test with legacy planner because my program
> >>> depend a lot of new feature based on blink planner.
> >>> 2. how do you register your UDF?
> >>> Just use the code :  tableEnv.registerFunction ("ts2Date", new
> >>> ts2Date());tableEnv is a StreamTableEnvironment.
> >>> 3. does this has a relation with checkpointing? what if you enable
> >>> checkpointing and not use your udf? and disable checkpointing and use udf?
> >>> I don't think this is related with checkpoint. If I enable checkpointing
> >>> and not use my udf, I did not see any exception and submit job
> >>> successfully. If I disable checkpointing and use udf, the job can submit
> >>> successfully too.
> >>>
> >>> I dive a lot with this exception. Maybe it is related with some
> >>> classloader issue. Hope for your suggestion.
> >>>
> >>>
> >>>
> >>>
> >>>
> >>> 在 2020-03-01 17:54:03,"Benchao Li"  写道:
> >>>
> >>> Hi fulin,
> >>>
> >>> It seems like a bug in the code generation.
> >>>
> >>> Could you provide us more information?
> >>> 1. what planner are you using? blink or legacy planner?
> >>> 2. how do you register your UDF?
> >>> 3. does this has a relation with checkpointing? what if you enable
> >>> checkpointing and not use your udf? and disable checkpointing and use udf?
> >>>
> >>> sunfulin  于2020年3月1日周日 下午5:41写道:
> >>>
>  Hi, guys
>  I am running Flink 1.10 SQL job with UpsertStreamSink to ElasticSearch.
>  In my sql logic, I am using a UDF like ts2Date to handle date format 
>  stream
>  fields. However, when I add the `env.enableCheckpointing(time)`, my job
>  failed to submit and throws exception like following. This is really 
>  weird,
>  cause when I remove the UDF, the job can submit successfully. Any
>  suggestion is highly appreciated. Besides, my sql logic is like :
> 
>  *INSERT INTO *realtime_product_sell
>  *select *U.sor_pty_id,
> U.entrust_date,
> U.entrust_time,
> U.product_code,
> U.business_type,
> sum(*cast*(U.balance *as double*)) *as *balance,
> COALESCE(C.cust_name, *'--'*) *as *cust_name,
> COALESCE(C.open_comp_name, *'--'*) *AS *open_comp_name,
> COALESCE(C.open_comp_id, 

Re: Re: Re: Re: Flink SQL job failed to submit with enableCheckpointing while SQL contains UDF

2020-03-01 Thread Benchao Li
Could you also provide us the DDL for lscsp_sc_order_all
and dim_app_cust_info ?

sunfulin  于2020年3月1日周日 下午9:22写道:

>
> *CREATE TABLE **realtime_product_sell *(
>   sor_pty_id *varchar*,
>   entrust_date *varchar*,
>   entrust_time *varchar*,
>   product_code *varchar *,
>   business_type *varchar *,
>   balance *double *,
>   cust_name *varchar *,
>   open_comp_name *varchar *,
>   open_comp_id *varchar *,
>   org_name *varchar *,
>   org_id *varchar *,
>   comp_name *varchar *,
>   comp_id *varchar *,
>   mng_name *varchar *,
>   mng_id *varchar *,
>   is_tg *varchar *,
>   cust_type *varchar *,
>   avg_tot_aset_y365 *double *,
>   avg_aset_create_y
> *double*) *WITH *(
> *'connector.type' *= *'elasticsearch'*,
> *'connector.version' *= *''*,
> *'connector.hosts' *= *''*,
> *'connector.index' *= *'realtime_product_sell_007118'*,
> *'connector.document-type' *= *'_doc'*,
> *'update-mode' *= *'upsert'*,
> *'connector.key-delimiter' *= *'$'*,
> *'connector.key-null-literal' *= *'n/a'*,
> *'connector.bulk-flush.interval' *= *'1000'*,
> *'format.type' *=
> *'json'*)
>
>
>
>
>
> At 2020-03-01 21:08:08, "Benchao Li"  wrote:
> >The UDF looks good. Could you also paste your DDL? Then we can produce your
> >bug easily.
> >
> >sunfulin  于2020年3月1日周日 下午6:39写道:
> >
> >> Below is the code. The function trans origin field timeStr "2020-03-01
> >> 12:01:00.234" to target timeStr accroding to dayTag.
> >>
> >> *public class *ts2Date *extends *ScalarFunction {
> >> *public *ts2Date() {
> >>
> >> }
> >>
> >>
> >> *public *String eval (String timeStr, *boolean *dayTag) {
> >>
> >> *if*(timeStr == *null*) {
> >> *return null*;
> >> }
> >> SimpleDateFormat ortSf = *new *SimpleDateFormat(*"-MM-dd
> >> HH:mm:ss.SSS"*);
> >> Date date = *new *Date();
> >> *try *{
> >> date = ortSf.parse(timeStr);
> >> } *catch *(ParseException e) {
> >> e.printStackTrace();
> >> *return null*;
> >> }
> >> *if *(dayTag) {
> >> String format = *"-MM-dd"*;
> >> SimpleDateFormat sf = *new *SimpleDateFormat(format);
> >> *return *sf.format(date);
> >> } *else *{
> >> String format = *"-MM-dd**\'**T**\'**HH:mm:00.000+0800"*;
> >> SimpleDateFormat sf = *new *SimpleDateFormat(format);
> >> *return *sf.format(date);
> >> }
> >> }
> >> }
> >>
> >>
> >>
> >> At 2020-03-01 18:14:30, "Benchao Li"  wrote:
> >>
> >> Could you show how your UDF `ts2Date` is implemented?
> >>
> >> sunfulin  于2020年3月1日周日 下午6:05写道:
> >>
> >>> Hi, Benchao,
> >>> Thanks for the reply.
> >>>
> >>> Could you provide us more information?
> >>> 1. what planner are you using? blink or legacy planner?
> >>> I am using Blink Planner. Not test with legacy planner because my program
> >>> depend a lot of new feature based on blink planner.
> >>> 2. how do you register your UDF?
> >>> Just use the code :  tableEnv.registerFunction ("ts2Date", new
> >>> ts2Date());tableEnv is a StreamTableEnvironment.
> >>> 3. does this has a relation with checkpointing? what if you enable
> >>> checkpointing and not use your udf? and disable checkpointing and use udf?
> >>> I don't think this is related with checkpoint. If I enable checkpointing
> >>> and not use my udf, I did not see any exception and submit job
> >>> successfully. If I disable checkpointing and use udf, the job can submit
> >>> successfully too.
> >>>
> >>> I dive a lot with this exception. Maybe it is related with some
> >>> classloader issue. Hope for your suggestion.
> >>>
> >>>
> >>>
> >>>
> >>>
> >>> 在 2020-03-01 17:54:03,"Benchao Li"  写道:
> >>>
> >>> Hi fulin,
> >>>
> >>> It seems like a bug in the code generation.
> >>>
> >>> Could you provide us more information?
> >>> 1. what planner are you using? blink or legacy planner?
> >>> 2. how do you register your UDF?
> >>> 3. does this has a relation with checkpointing? what if you enable
> >>> checkpointing and not use your udf? and disable checkpointing and use udf?
> >>>
> >>> sunfulin  于2020年3月1日周日 下午5:41写道:
> >>>
>  Hi, guys
>  I am running Flink 1.10 SQL job with UpsertStreamSink to ElasticSearch.
>  In my sql logic, I am using a UDF like ts2Date to handle date format 
>  stream
>  fields. However, when I add the `env.enableCheckpointing(time)`, my job
>  failed to submit and throws exception like following. This is really 
>  weird,
>  cause when I remove the UDF, the job can submit successfully. Any
>  suggestion is highly appreciated. Besides, my sql logic is like :
> 
>  *INSERT INTO *realtime_product_sell
>  *select *U.sor_pty_id,
> U.entrust_date,
> U.entrust_time,
> U.product_code,
> U.business_type,
> sum(*cast*(U.balance *as double*)) *as *balance,
> COALESCE(C.cust_name, *'--'*) *as *cust_name,
> COALESCE(C.open_comp_name, *'--'*) *AS *open_comp_name,
> COALESCE(C.open_comp_id, 

Re: Operator latency metric not working in 1.9.1

2020-03-01 Thread Rafi Aroch
Hi Ori,

Make sure that latency metrics is enabled. It's disabled by default. See
also that the scope is set properly.

https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/metrics.html#latency-tracking
https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/config.html#metrics-latency-interval

Thanks,
Rafi

On Sun, Mar 1, 2020, 13:32 Ori Popowski  wrote:

> We've been working with Flink 1.5.2 with Prometheus and ever since we
> upgraded to version 1.9.1 the operator latency metric is not available
> anymore (
> flink_taskmanager_job_latency_source_id_source_subtask_index_operator_id_operator_subtask_index_latency
> )
>
> I've seen in the docs that this metric is still supported. Does anyone
> know why it happens?
>
> Thanks
>
>
>


[Question] enable end2end Kafka exactly once processing

2020-03-01 Thread Jin Yi
Hi experts,

My application is using Apache Beam and with Flink to be the runner. My
source and sink are kafka topics, and I am using KafkaIO connector provided
by Apache Beam to consume and publish.

I am reading through Beam's java doc:
https://beam.apache.org/releases/javadoc/2.16.0/org/apache/beam/sdk/io/kafka/KafkaIO.WriteRecords.html#withEOS-int-java.lang.String-

It looks like Beam does not support Flink Runner for EOS, can someone
please shad some lights on how to enable exactly once processing with
Apache Beam?

Thanks a lot!
Eleanore


[Question] enable end2end Kafka exactly once processing

2020-03-01 Thread Jin Yi
Hi experts,

My application is using Apache Beam and with Flink to be the runner. My
source and sink are kafka topics, and I am using KafkaIO connector provided
by Apache Beam to consume and publish.

I am reading through Beam's java doc:
https://beam.apache.org/releases/javadoc/2.16.0/org/apache/beam/sdk/io/kafka/KafkaIO.WriteRecords.html#withEOS-int-java.lang.String-

It looks like Beam does not support Flink Runner for EOS, can someone
please shad some lights on how to enable exactly once processing with
Apache Beam?

Thanks a lot!
Eleanore


Re: Do I need to use Table API or DataStream API to construct sources?

2020-03-01 Thread kant kodali
* What went wrong:
Could not determine the dependencies of task ':shadowJar'.
> Could not resolve all dependencies for configuration ':flinkShadowJar'.
   > Could not find
org.apache.flink:flink-sql-connector-kafka_2.11:universal.
 Searched in the following locations:
   -
https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-kafka_2.11/universal/flink-sql-connector-kafka_2.11-universal.pom
   -
https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-kafka_2.11/universal/flink-sql-connector-kafka_2.11-universal.jar
   -
https://repository.apache.org/content/repositories/snapshots/org/apache/flink/flink-sql-connector-kafka_2.11/universal/flink-sql-connector-kafka_2.11-universal.pom
   -
https://repository.apache.org/content/repositories/snapshots/org/apache/flink/flink-sql-connector-kafka_2.11/universal/flink-sql-connector-kafka_2.11-universal.jar
 Required by:
 project :



On Sun, Mar 1, 2020 at 6:43 AM Dawid Wysakowicz 
wrote:

> Hi Kant,
>
> If you want to use the *universal *kafka connector you use "universal"
> for the version. The community decided to no longer distinguish different
> kafka connector versions, but to use the newest kafka client version for
> all versions of kafka 1.0+. So if you want to use the connector from
> flink-sql-connector-kafka_2.11 use "universal" for the version.
>
> As for the collect/print sink. We do realize importance of the sink and
> there were a few approaches to implement one. Including the TableUtils
> mentioned by godfrey. It does not have strong consistency guarantees and is
> recommended rather only for experiments/testing. There is also an ongoing
> discussion how to implement such a sink for *both *batch and streaming
> here:
> https://issues.apache.org/jira/browse/FLINK-14807?page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel=17046455#comment-17046455
>
> Best,
>
> Dawid
> On 01/03/2020 12:00, kant kodali wrote:
>
> Hi Benchao,
>
> That worked! Pasting the build.gradle file here. However this only works
> for 0.11 and it needs zookeeper.connect() which shouldn't be required. not
> sure why it is required in Flink Kafka connector?  If I change the version
> to 2.2 in the code and specify this jar
>
> flinkShadowJar 
> "org.apache.flink:flink-sql-connector-kafka_2.11:${flinkVersion}"
>
> or
>
> flinkShadowJar 
> "org.apache.flink:flink-sql-connector-kafka-0.11_2.11:${flinkVersion}" //Not 
> sure if I should use this one for Kafka >= 0.11
>
> It doesn't work either.
>
>
> buildscript {repositories {jcenter() // this applies only to the 
> Gradle 'Shadow' plugin}dependencies {classpath 
> 'com.github.jengelman.gradle.plugins:shadow:2.0.4'}}plugins {id 
> 'java'id 'application'}mainClassName = 'Test'apply plugin: 
> 'com.github.johnrengelman.shadow'ext {javaVersion = '1.8'flinkVersion 
> = '1.10.0'scalaBinaryVersion = '2.11'slf4jVersion = '1.7.7'
> log4jVersion = '1.2.17'}sourceCompatibility = javaVersiontargetCompatibility 
> = javaVersiontasks.withType(JavaCompile) {options.encoding = 
> 'UTF-8'}applicationDefaultJvmArgs = ["-Dlog4j.configuration=log4j.properties"]
> // declare where to find the dependencies of your projectrepositories {
> mavenCentral()
> maven { url 
> "https://repository.apache.org/content/repositories/snapshots/; }}// NOTE: We 
> cannot use "compileOnly" or "shadow" configurations since then we could not 
> run code// in the IDE or with "gradle run". We also cannot exclude transitive 
> dependencies from the// shadowJar yet (see 
> https://github.com/johnrengelman/shadow/issues/159).// -> Explicitly define 
> the // libraries we want to be included in the "flinkShadowJar" 
> configuration!configurations {flinkShadowJar // dependencies which go 
> into the shadowJar// always exclude these (also from transitive 
> dependencies) since they are provided by FlinkflinkShadowJar.exclude 
> group: 'org.apache.flink', module: 'force-shading'flinkShadowJar.exclude 
> group: 'com.google.code.findbugs', module: 'jsr305'flinkShadowJar.exclude 
> group: 'org.slf4j'flinkShadowJar.exclude group: 'log4j'}// declare the 
> dependencies for your production and test codedependencies {// 
> --// 
> Compile-time dependencies that should NOT be part of the// shadow jar and 
> are provided in the lib folder of Flink// 
> --compile 
> "org.apache.flink:flink-java:${flinkVersion}"compile 
> "org.apache.flink:flink-streaming-java_${scalaBinaryVersion}:${flinkVersion}" 
>// --// 
> Dependencies that should be part of the shadow jar, e.g.// connectors. 
> These must be in the flinkShadowJar configuration!// 
> 

[ANNOUNCE] Weekly Community Update 2020/09

2020-03-01 Thread Konstantin Knauf
Dear community,

happy to share this week's community update. It was a relatively quiet week
on the dev@ mailing list (mostly votes on previously covered FLIPs), but
there is always something to share. Additionally, I have decided to also
feature *flink-packages.org  *in this newsletter
going forward. Depending on the level of activity, I will cover newly added
packages or introduce one of the existing packages.

Flink Development
==

* [sql] Dawid has started a discussion to enable Tabla API/SQL sources to
read columns from different parts of source records. With this it would,
for example, be possible to read partition, timestamp or offset from a
Kafka source record. Similarly, it would be possible to specify override
partitioning when writing to Kafka or Kinesis. [1]

* [sql, python] FLIP-58 introduced Python UDFs in SQL and Table API.
FLIP-79 added a Function DDL in Flink SQL to register Java & Scala UDFs in
pure SQL. Based on these two FLIPs, Wei Zhon published FLIP-106 to also
support Python UDFs in the SQL Function DDL. [2]

* [development] Chesnay started a discussion on Eclipse support for Apache
Flink (framework) development. If you are using Eclipse as an Apache Flink
contributor, please get involved in the thread. [3]

[1]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-107-Reading-table-columns-from-different-parts-of-source-records-tp38277.html
[2]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-106-Support-Python-UDF-in-SQL-Function-DDL-tp38107.html
[3]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Remove-Eclipse-specific-plugin-configurations-tp38255.html

Notable Bugs
==

[FLINK-16262] [1.10.0] The FlinkKafkaProducer can not be used in
EXACTLY_ONCE mode when using the user code classloader. For application
cluster (per-job clusters) you can work around this issue by using the
system classloader (user jar in lib/ directory). Will be fixed in 1.10.1.
[4]

[4] https://issues.apache.org/jira/browse/FLINK-16262

flink-packages.org
=

DTStack, a Chinese cloud technology company, has recently published FlinkX
[5] on flink-packages.org. The documentation is Chinese only, but it seems
to be a configuration-based integration framework based on Apache Flink
with an impressive set of connectors.

[5] https://flink-packages.org/packages/flinkx

Events, Blog Posts, Misc
===

* This week I stumbled across this Azure tutorial to use Event Hubs with
Apache Flink. [6]
* Gökce Sürenkök has written a blog post on setting up a highly available
Flink cluster on Kubernetes based on Zookeeper for Flink Master failover
and HDFS as checkpoint storage. [7]

* Upcoming Meetups
* On March 5th, Stephan Ewen will talk about Apache Flink Stateful
Function at the Utrecht Data Engineering Meetup. [8]
* On March 12th, Prateep Kumar will host an online event comparing
Kafka Streams and Apache Flink [9].
* On April 22, Ververica will host the next Apache Flink meetup in
Berlin. [10]
* Cloudera is hosting a couple of "Future of Data" events on stream
processing with Apache Flink in
* Vienna (March 4th, full-day workshop) [11]
* Zurich (March 10th, full-day workshop) [12]
* New Jersey (May 5th, meetup) [13]

[6]
https://docs.microsoft.com/en-us/azure/event-hubs/event-hubs-kafka-flink-tutorial
[7]
https://medium.com/hepsiburadatech/high-available-flink-cluster-on-kubernetes-setup-73b2baf9200e
[8] https://www.meetup.com/Data-Engineering-NL/events/268424399/
[9]
https://www.meetup.com/apache-flink-aws-kinesis-hyd-india/events/268930388/
[10] https://www.meetup.com/Apache-Flink-Meetup/events/269005339/
[11] https://www.meetup.com/futureofdata-vienna/events/268418974/
[12] https://www.meetup.com/futureofdata-zurich/events/268423809/
[13] https://www.meetup.com/futureofdata-princeton/events/268830725/

Cheers,

Konstantin (@snntrable)

-- 

Konstantin Knauf | Head of Product

+49 160 91394525


Follow us @VervericaData Ververica 


--

Join Flink Forward  - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
(Tony) Cheng


Re: Exceptions in Web UI do not appear in logs

2020-03-01 Thread orips
Hi,

It's version 1.5.2.

I actually found the place in the code responsible for it.
In the "catch" block, it doesn't log the error and it lets it propagate.

https://github.com/apache/flink/blob/62839e88e15b338a8af9afcef698c38a194c592f/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java





--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Single stream, two sinks

2020-03-01 Thread miki haiat
So you have rabitmq source and http sink?
If so you can use side output in order to dump your data to db.

https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/side_output.html

On Sat, Feb 29, 2020, 23:01 Gadi Katsovich  wrote:

> Hi,
> I'm new to flink and am evaluating it to replace our existing streaming
> application.
> The use case I'm working on is reading messages from RabbitMQ queue,
> applying some transformation and filtering logic and sending it via HTTP to
> a 3rd party.
> A must have requirement of this flow is to to write the data that was sent
> to an SQL db, for audit and troubleshooting purposes.
> I'm currently basing my HTTP solution on a PR with needed adjustments:
> https://github.com/apache/flink/pull/5866/files
> How can I add an insertion to a DB after a successful HTTP request?
> Thank you.
>


Kubernetes Error Code

2020-03-01 Thread Samir Tusharbhai Chauhan


Hi,
Does anyone knows what is below error code? Our Flink pod got restarted and we 
see below when we do edit pod.
[cid:image001.png@01D5F01E.53D91B40]
Warm Regards,
Samir Chauhan



There's a reason we support Fair Dealing. YOU.


This email and any files transmitted with it or attached to it (the [Email]) 
may contain confidential, proprietary or legally privileged information and is 
intended solely for the use of the individual or entity to whom it is 
addressed. If you are not the intended recipient of the Email, you must not, 
directly or indirectly, copy, use, print, distribute, disclose to any other 
party or take any action in reliance on any part of the Email. Please notify 
the system manager or sender of the error and delete all copies of the Email 
immediately.  

No statement in the Email should be construed as investment advice being given 
within or outside Singapore. Prudential Assurance Company Singapore (Pte) 
Limited (PACS)  and each of its related entities shall not be responsible for 
any losses, claims, penalties, costs or damages arising from or in connection 
with the use of the Email or the information therein, in whole or in part. You 
are solely responsible for conducting any virus checks prior to opening, 
accessing or disseminating the Email.

PACS (Company Registration No. 199002477Z) is a company incorporated under the 
laws of Singapore and has its registered office at 30 Cecil Street, #30-01, 
Prudential Tower, Singapore 049712.

PACS is an indirect wholly owned subsidiary of Prudential plc of the United 
Kingdom. PACS and Prudential plc are not affiliated in any manner with 
Prudential Financial, Inc., a company whose principal place of business is in 
the United States of America.

Re:Re: Re: Re: Flink SQL job failed to submit with enableCheckpointing while SQL contains UDF

2020-03-01 Thread sunfulin



CREATE TABLE realtime_product_sell (
  sor_pty_id varchar,
  entrust_date varchar,
  entrust_time varchar,
  product_code varchar ,
  business_type varchar ,
  balance double ,
  cust_name varchar ,
  open_comp_name varchar ,
  open_comp_id varchar ,
  org_name varchar ,
  org_id varchar ,
  comp_name varchar ,
  comp_id varchar ,
  mng_name varchar ,
  mng_id varchar ,
  is_tg varchar ,
  cust_type varchar ,
  avg_tot_aset_y365 double ,
  avg_aset_create_y double
) WITH (
'connector.type' = 'elasticsearch',
'connector.version' = '',
'connector.hosts' = '',
'connector.index' = 'realtime_product_sell_007118',
'connector.document-type' = '_doc',
'update-mode' = 'upsert',
'connector.key-delimiter' = '$',
'connector.key-null-literal' = 'n/a',
'connector.bulk-flush.interval' = '1000',
'format.type' = 'json'
)











At 2020-03-01 21:08:08, "Benchao Li"  wrote:
>The UDF looks good. Could you also paste your DDL? Then we can produce your
>bug easily.
>
>sunfulin  于2020年3月1日周日 下午6:39写道:
>
>> Below is the code. The function trans origin field timeStr "2020-03-01
>> 12:01:00.234" to target timeStr accroding to dayTag.
>>
>> *public class *ts2Date *extends *ScalarFunction {
>> *public *ts2Date() {
>>
>> }
>>
>>
>> *public *String eval (String timeStr, *boolean *dayTag) {
>>
>> *if*(timeStr == *null*) {
>> *return null*;
>> }
>> SimpleDateFormat ortSf = *new *SimpleDateFormat(*"-MM-dd
>> HH:mm:ss.SSS"*);
>> Date date = *new *Date();
>> *try *{
>> date = ortSf.parse(timeStr);
>> } *catch *(ParseException e) {
>> e.printStackTrace();
>> *return null*;
>> }
>> *if *(dayTag) {
>> String format = *"-MM-dd"*;
>> SimpleDateFormat sf = *new *SimpleDateFormat(format);
>> *return *sf.format(date);
>> } *else *{
>> String format = *"-MM-dd**\'**T**\'**HH:mm:00.000+0800"*;
>> SimpleDateFormat sf = *new *SimpleDateFormat(format);
>> *return *sf.format(date);
>> }
>> }
>> }
>>
>>
>>
>> At 2020-03-01 18:14:30, "Benchao Li"  wrote:
>>
>> Could you show how your UDF `ts2Date` is implemented?
>>
>> sunfulin  于2020年3月1日周日 下午6:05写道:
>>
>>> Hi, Benchao,
>>> Thanks for the reply.
>>>
>>> Could you provide us more information?
>>> 1. what planner are you using? blink or legacy planner?
>>> I am using Blink Planner. Not test with legacy planner because my program
>>> depend a lot of new feature based on blink planner.
>>> 2. how do you register your UDF?
>>> Just use the code :  tableEnv.registerFunction ("ts2Date", new
>>> ts2Date());tableEnv is a StreamTableEnvironment.
>>> 3. does this has a relation with checkpointing? what if you enable
>>> checkpointing and not use your udf? and disable checkpointing and use udf?
>>> I don't think this is related with checkpoint. If I enable checkpointing
>>> and not use my udf, I did not see any exception and submit job
>>> successfully. If I disable checkpointing and use udf, the job can submit
>>> successfully too.
>>>
>>> I dive a lot with this exception. Maybe it is related with some
>>> classloader issue. Hope for your suggestion.
>>>
>>>
>>>
>>>
>>>
>>> 在 2020-03-01 17:54:03,"Benchao Li"  写道:
>>>
>>> Hi fulin,
>>>
>>> It seems like a bug in the code generation.
>>>
>>> Could you provide us more information?
>>> 1. what planner are you using? blink or legacy planner?
>>> 2. how do you register your UDF?
>>> 3. does this has a relation with checkpointing? what if you enable
>>> checkpointing and not use your udf? and disable checkpointing and use udf?
>>>
>>> sunfulin  于2020年3月1日周日 下午5:41写道:
>>>
 Hi, guys
 I am running Flink 1.10 SQL job with UpsertStreamSink to ElasticSearch.
 In my sql logic, I am using a UDF like ts2Date to handle date format stream
 fields. However, when I add the `env.enableCheckpointing(time)`, my job
 failed to submit and throws exception like following. This is really weird,
 cause when I remove the UDF, the job can submit successfully. Any
 suggestion is highly appreciated. Besides, my sql logic is like :

 *INSERT INTO *realtime_product_sell
 *select *U.sor_pty_id,
U.entrust_date,
U.entrust_time,
U.product_code,
U.business_type,
sum(*cast*(U.balance *as double*)) *as *balance,
COALESCE(C.cust_name, *'--'*) *as *cust_name,
COALESCE(C.open_comp_name, *'--'*) *AS *open_comp_name,
COALESCE(C.open_comp_id, *'--'*) *as *open_comp_id,
COALESCE(C.org_name,*'--'*) *as *org_name,
COALESCE(C.org_id,*'--'*) *as *comp_name,
COALESCE(C.comp_name, *'--'*) *AS *comp_name,
COALESCE(C.comp_id,*'--'*) *as *comp_id,
COALESCE(C.mng_name,*'--'*) *as *mng_name,
COALESCE(C.mng_id,*'--'*) *as *mng_id,
COALESCE(C.is_tg,*'--'*) *as *is_tg,
COALESCE(C.cust_type,*'--'*) *as *cust_type,

Re:Re: Re: Re: Flink SQL job failed to submit with enableCheckpointing while SQL contains UDF

2020-03-01 Thread sunfulin



CREATE TABLE realtime_product_sell (
  sor_pty_id varchar,
  entrust_date varchar,
  entrust_time varchar,
  product_code varchar ,
  business_type varchar ,
  balance double ,
  cust_name varchar ,
  open_comp_name varchar ,
  open_comp_id varchar ,
  org_name varchar ,
  org_id varchar ,
  comp_name varchar ,
  comp_id varchar ,
  mng_name varchar ,
  mng_id varchar ,
  is_tg varchar ,
  cust_type varchar ,
  avg_tot_aset_y365 double ,
  avg_aset_create_y double
) WITH (
'connector.type' = 'elasticsearch',
'connector.version' = '',
'connector.hosts' = '',
'connector.index' = 'realtime_product_sell_007118',
'connector.document-type' = '_doc',
'update-mode' = 'upsert',
'connector.key-delimiter' = '$',
'connector.key-null-literal' = 'n/a',
'connector.bulk-flush.interval' = '1000',
'format.type' = 'json'
)











At 2020-03-01 21:08:08, "Benchao Li"  wrote:
>The UDF looks good. Could you also paste your DDL? Then we can produce your
>bug easily.
>
>sunfulin  于2020年3月1日周日 下午6:39写道:
>
>> Below is the code. The function trans origin field timeStr "2020-03-01
>> 12:01:00.234" to target timeStr accroding to dayTag.
>>
>> *public class *ts2Date *extends *ScalarFunction {
>> *public *ts2Date() {
>>
>> }
>>
>>
>> *public *String eval (String timeStr, *boolean *dayTag) {
>>
>> *if*(timeStr == *null*) {
>> *return null*;
>> }
>> SimpleDateFormat ortSf = *new *SimpleDateFormat(*"-MM-dd
>> HH:mm:ss.SSS"*);
>> Date date = *new *Date();
>> *try *{
>> date = ortSf.parse(timeStr);
>> } *catch *(ParseException e) {
>> e.printStackTrace();
>> *return null*;
>> }
>> *if *(dayTag) {
>> String format = *"-MM-dd"*;
>> SimpleDateFormat sf = *new *SimpleDateFormat(format);
>> *return *sf.format(date);
>> } *else *{
>> String format = *"-MM-dd**\'**T**\'**HH:mm:00.000+0800"*;
>> SimpleDateFormat sf = *new *SimpleDateFormat(format);
>> *return *sf.format(date);
>> }
>> }
>> }
>>
>>
>>
>> At 2020-03-01 18:14:30, "Benchao Li"  wrote:
>>
>> Could you show how your UDF `ts2Date` is implemented?
>>
>> sunfulin  于2020年3月1日周日 下午6:05写道:
>>
>>> Hi, Benchao,
>>> Thanks for the reply.
>>>
>>> Could you provide us more information?
>>> 1. what planner are you using? blink or legacy planner?
>>> I am using Blink Planner. Not test with legacy planner because my program
>>> depend a lot of new feature based on blink planner.
>>> 2. how do you register your UDF?
>>> Just use the code :  tableEnv.registerFunction ("ts2Date", new
>>> ts2Date());tableEnv is a StreamTableEnvironment.
>>> 3. does this has a relation with checkpointing? what if you enable
>>> checkpointing and not use your udf? and disable checkpointing and use udf?
>>> I don't think this is related with checkpoint. If I enable checkpointing
>>> and not use my udf, I did not see any exception and submit job
>>> successfully. If I disable checkpointing and use udf, the job can submit
>>> successfully too.
>>>
>>> I dive a lot with this exception. Maybe it is related with some
>>> classloader issue. Hope for your suggestion.
>>>
>>>
>>>
>>>
>>>
>>> 在 2020-03-01 17:54:03,"Benchao Li"  写道:
>>>
>>> Hi fulin,
>>>
>>> It seems like a bug in the code generation.
>>>
>>> Could you provide us more information?
>>> 1. what planner are you using? blink or legacy planner?
>>> 2. how do you register your UDF?
>>> 3. does this has a relation with checkpointing? what if you enable
>>> checkpointing and not use your udf? and disable checkpointing and use udf?
>>>
>>> sunfulin  于2020年3月1日周日 下午5:41写道:
>>>
 Hi, guys
 I am running Flink 1.10 SQL job with UpsertStreamSink to ElasticSearch.
 In my sql logic, I am using a UDF like ts2Date to handle date format stream
 fields. However, when I add the `env.enableCheckpointing(time)`, my job
 failed to submit and throws exception like following. This is really weird,
 cause when I remove the UDF, the job can submit successfully. Any
 suggestion is highly appreciated. Besides, my sql logic is like :

 *INSERT INTO *realtime_product_sell
 *select *U.sor_pty_id,
U.entrust_date,
U.entrust_time,
U.product_code,
U.business_type,
sum(*cast*(U.balance *as double*)) *as *balance,
COALESCE(C.cust_name, *'--'*) *as *cust_name,
COALESCE(C.open_comp_name, *'--'*) *AS *open_comp_name,
COALESCE(C.open_comp_id, *'--'*) *as *open_comp_id,
COALESCE(C.org_name,*'--'*) *as *org_name,
COALESCE(C.org_id,*'--'*) *as *comp_name,
COALESCE(C.comp_name, *'--'*) *AS *comp_name,
COALESCE(C.comp_id,*'--'*) *as *comp_id,
COALESCE(C.mng_name,*'--'*) *as *mng_name,
COALESCE(C.mng_id,*'--'*) *as *mng_id,
COALESCE(C.is_tg,*'--'*) *as *is_tg,
COALESCE(C.cust_type,*'--'*) *as *cust_type,

Re: Re: Re: Flink SQL job failed to submit with enableCheckpointing while SQL contains UDF

2020-03-01 Thread Benchao Li
The UDF looks good. Could you also paste your DDL? Then we can produce your
bug easily.

sunfulin  于2020年3月1日周日 下午6:39写道:

> Below is the code. The function trans origin field timeStr "2020-03-01
> 12:01:00.234" to target timeStr accroding to dayTag.
>
> *public class *ts2Date *extends *ScalarFunction {
> *public *ts2Date() {
>
> }
>
>
> *public *String eval (String timeStr, *boolean *dayTag) {
>
> *if*(timeStr == *null*) {
> *return null*;
> }
> SimpleDateFormat ortSf = *new *SimpleDateFormat(*"-MM-dd
> HH:mm:ss.SSS"*);
> Date date = *new *Date();
> *try *{
> date = ortSf.parse(timeStr);
> } *catch *(ParseException e) {
> e.printStackTrace();
> *return null*;
> }
> *if *(dayTag) {
> String format = *"-MM-dd"*;
> SimpleDateFormat sf = *new *SimpleDateFormat(format);
> *return *sf.format(date);
> } *else *{
> String format = *"-MM-dd**\'**T**\'**HH:mm:00.000+0800"*;
> SimpleDateFormat sf = *new *SimpleDateFormat(format);
> *return *sf.format(date);
> }
> }
> }
>
>
>
> At 2020-03-01 18:14:30, "Benchao Li"  wrote:
>
> Could you show how your UDF `ts2Date` is implemented?
>
> sunfulin  于2020年3月1日周日 下午6:05写道:
>
>> Hi, Benchao,
>> Thanks for the reply.
>>
>> Could you provide us more information?
>> 1. what planner are you using? blink or legacy planner?
>> I am using Blink Planner. Not test with legacy planner because my program
>> depend a lot of new feature based on blink planner.
>> 2. how do you register your UDF?
>> Just use the code :  tableEnv.registerFunction ("ts2Date", new
>> ts2Date());tableEnv is a StreamTableEnvironment.
>> 3. does this has a relation with checkpointing? what if you enable
>> checkpointing and not use your udf? and disable checkpointing and use udf?
>> I don't think this is related with checkpoint. If I enable checkpointing
>> and not use my udf, I did not see any exception and submit job
>> successfully. If I disable checkpointing and use udf, the job can submit
>> successfully too.
>>
>> I dive a lot with this exception. Maybe it is related with some
>> classloader issue. Hope for your suggestion.
>>
>>
>>
>>
>>
>> 在 2020-03-01 17:54:03,"Benchao Li"  写道:
>>
>> Hi fulin,
>>
>> It seems like a bug in the code generation.
>>
>> Could you provide us more information?
>> 1. what planner are you using? blink or legacy planner?
>> 2. how do you register your UDF?
>> 3. does this has a relation with checkpointing? what if you enable
>> checkpointing and not use your udf? and disable checkpointing and use udf?
>>
>> sunfulin  于2020年3月1日周日 下午5:41写道:
>>
>>> Hi, guys
>>> I am running Flink 1.10 SQL job with UpsertStreamSink to ElasticSearch.
>>> In my sql logic, I am using a UDF like ts2Date to handle date format stream
>>> fields. However, when I add the `env.enableCheckpointing(time)`, my job
>>> failed to submit and throws exception like following. This is really weird,
>>> cause when I remove the UDF, the job can submit successfully. Any
>>> suggestion is highly appreciated. Besides, my sql logic is like :
>>>
>>> *INSERT INTO *realtime_product_sell
>>> *select *U.sor_pty_id,
>>>U.entrust_date,
>>>U.entrust_time,
>>>U.product_code,
>>>U.business_type,
>>>sum(*cast*(U.balance *as double*)) *as *balance,
>>>COALESCE(C.cust_name, *'--'*) *as *cust_name,
>>>COALESCE(C.open_comp_name, *'--'*) *AS *open_comp_name,
>>>COALESCE(C.open_comp_id, *'--'*) *as *open_comp_id,
>>>COALESCE(C.org_name,*'--'*) *as *org_name,
>>>COALESCE(C.org_id,*'--'*) *as *comp_name,
>>>COALESCE(C.comp_name, *'--'*) *AS *comp_name,
>>>COALESCE(C.comp_id,*'--'*) *as *comp_id,
>>>COALESCE(C.mng_name,*'--'*) *as *mng_name,
>>>COALESCE(C.mng_id,*'--'*) *as *mng_id,
>>>COALESCE(C.is_tg,*'--'*) *as *is_tg,
>>>COALESCE(C.cust_type,*'--'*) *as *cust_type,
>>>COALESCE(C.avg_tot_aset_y365, 0.00) *as *avg_tot_aset_y365,
>>>COALESCE(C.avg_aset_create_y, 0.00) *as *avg_aset_create_y
>>>
>>> *from*(*select *customerNumber *as *sor_pty_id,
>>> ts2Date(`lastUpdateTime`, *true*) *as *entrust_date, -- the
>>> UDF
>>>ts2Date(`lastUpdateTime`, *false*) *as *entrust_time, -- the
>>> UDF
>>> fundCode *as *product_code,
>>> businessType *as *business_type,
>>> balance,
>>> proctime
>>>   *from **lscsp_sc_order_all **where *fundCode *in *(*'007118'*,
>>> *'007117'*) *and *businessType *in *(*'5'*) ) *as *U
>>>
>>> *left join**dim_app_cust_info **FOR *SYSTEM_TIME *AS OF *U.proctime *AS
>>> *C
>>> *on **U*.sor_pty_id = *C*.cust_id
>>> *group by *sor_pty_id,
>>> entrust_date,
>>> entrust_time,
>>> product_code,
>>> business_type,
>>> COALESCE(C.cust_name, *'--'*),
>>> COALESCE(C.open_comp_name, *'--'*),
>>> COALESCE(C.open_comp_id, *'--'*),
>>>  

Re: Re: Re: Flink SQL job failed to submit with enableCheckpointing while SQL contains UDF

2020-03-01 Thread Benchao Li
The UDF looks good. Could you also paste your DDL? Then we can produce your
bug easily.

sunfulin  于2020年3月1日周日 下午6:39写道:

> Below is the code. The function trans origin field timeStr "2020-03-01
> 12:01:00.234" to target timeStr accroding to dayTag.
>
> *public class *ts2Date *extends *ScalarFunction {
> *public *ts2Date() {
>
> }
>
>
> *public *String eval (String timeStr, *boolean *dayTag) {
>
> *if*(timeStr == *null*) {
> *return null*;
> }
> SimpleDateFormat ortSf = *new *SimpleDateFormat(*"-MM-dd
> HH:mm:ss.SSS"*);
> Date date = *new *Date();
> *try *{
> date = ortSf.parse(timeStr);
> } *catch *(ParseException e) {
> e.printStackTrace();
> *return null*;
> }
> *if *(dayTag) {
> String format = *"-MM-dd"*;
> SimpleDateFormat sf = *new *SimpleDateFormat(format);
> *return *sf.format(date);
> } *else *{
> String format = *"-MM-dd**\'**T**\'**HH:mm:00.000+0800"*;
> SimpleDateFormat sf = *new *SimpleDateFormat(format);
> *return *sf.format(date);
> }
> }
> }
>
>
>
> At 2020-03-01 18:14:30, "Benchao Li"  wrote:
>
> Could you show how your UDF `ts2Date` is implemented?
>
> sunfulin  于2020年3月1日周日 下午6:05写道:
>
>> Hi, Benchao,
>> Thanks for the reply.
>>
>> Could you provide us more information?
>> 1. what planner are you using? blink or legacy planner?
>> I am using Blink Planner. Not test with legacy planner because my program
>> depend a lot of new feature based on blink planner.
>> 2. how do you register your UDF?
>> Just use the code :  tableEnv.registerFunction ("ts2Date", new
>> ts2Date());tableEnv is a StreamTableEnvironment.
>> 3. does this has a relation with checkpointing? what if you enable
>> checkpointing and not use your udf? and disable checkpointing and use udf?
>> I don't think this is related with checkpoint. If I enable checkpointing
>> and not use my udf, I did not see any exception and submit job
>> successfully. If I disable checkpointing and use udf, the job can submit
>> successfully too.
>>
>> I dive a lot with this exception. Maybe it is related with some
>> classloader issue. Hope for your suggestion.
>>
>>
>>
>>
>>
>> 在 2020-03-01 17:54:03,"Benchao Li"  写道:
>>
>> Hi fulin,
>>
>> It seems like a bug in the code generation.
>>
>> Could you provide us more information?
>> 1. what planner are you using? blink or legacy planner?
>> 2. how do you register your UDF?
>> 3. does this has a relation with checkpointing? what if you enable
>> checkpointing and not use your udf? and disable checkpointing and use udf?
>>
>> sunfulin  于2020年3月1日周日 下午5:41写道:
>>
>>> Hi, guys
>>> I am running Flink 1.10 SQL job with UpsertStreamSink to ElasticSearch.
>>> In my sql logic, I am using a UDF like ts2Date to handle date format stream
>>> fields. However, when I add the `env.enableCheckpointing(time)`, my job
>>> failed to submit and throws exception like following. This is really weird,
>>> cause when I remove the UDF, the job can submit successfully. Any
>>> suggestion is highly appreciated. Besides, my sql logic is like :
>>>
>>> *INSERT INTO *realtime_product_sell
>>> *select *U.sor_pty_id,
>>>U.entrust_date,
>>>U.entrust_time,
>>>U.product_code,
>>>U.business_type,
>>>sum(*cast*(U.balance *as double*)) *as *balance,
>>>COALESCE(C.cust_name, *'--'*) *as *cust_name,
>>>COALESCE(C.open_comp_name, *'--'*) *AS *open_comp_name,
>>>COALESCE(C.open_comp_id, *'--'*) *as *open_comp_id,
>>>COALESCE(C.org_name,*'--'*) *as *org_name,
>>>COALESCE(C.org_id,*'--'*) *as *comp_name,
>>>COALESCE(C.comp_name, *'--'*) *AS *comp_name,
>>>COALESCE(C.comp_id,*'--'*) *as *comp_id,
>>>COALESCE(C.mng_name,*'--'*) *as *mng_name,
>>>COALESCE(C.mng_id,*'--'*) *as *mng_id,
>>>COALESCE(C.is_tg,*'--'*) *as *is_tg,
>>>COALESCE(C.cust_type,*'--'*) *as *cust_type,
>>>COALESCE(C.avg_tot_aset_y365, 0.00) *as *avg_tot_aset_y365,
>>>COALESCE(C.avg_aset_create_y, 0.00) *as *avg_aset_create_y
>>>
>>> *from*(*select *customerNumber *as *sor_pty_id,
>>> ts2Date(`lastUpdateTime`, *true*) *as *entrust_date, -- the
>>> UDF
>>>ts2Date(`lastUpdateTime`, *false*) *as *entrust_time, -- the
>>> UDF
>>> fundCode *as *product_code,
>>> businessType *as *business_type,
>>> balance,
>>> proctime
>>>   *from **lscsp_sc_order_all **where *fundCode *in *(*'007118'*,
>>> *'007117'*) *and *businessType *in *(*'5'*) ) *as *U
>>>
>>> *left join**dim_app_cust_info **FOR *SYSTEM_TIME *AS OF *U.proctime *AS
>>> *C
>>> *on **U*.sor_pty_id = *C*.cust_id
>>> *group by *sor_pty_id,
>>> entrust_date,
>>> entrust_time,
>>> product_code,
>>> business_type,
>>> COALESCE(C.cust_name, *'--'*),
>>> COALESCE(C.open_comp_name, *'--'*),
>>> COALESCE(C.open_comp_id, *'--'*),
>>>  

Re: Giving useful names to the SQL steps/operators.

2020-03-01 Thread Niels Basjes
Thanks.

On Sat, Feb 29, 2020 at 4:20 PM Yuval Itzchakov  wrote:

>
> Unfortunately, it isn't possible. You can't set names to steps like
> ordinary Java/Scala functions.
>
> On Sat, 29 Feb 2020, 17:11 Niels Basjes,  wrote:
>
>> Hi,
>>
>> I'm playing around with the streaming SQL engine in combination with the
>> UDF I wrote ( https://yauaa.basjes.nl/UDF-ApacheFlinkTable.html ) .
>> I generated an SQL statement to extract all possible fields of my UDF
>> (i.e. many fields) and what I found is that the names of the steps in the
>> logging and the UI become ... very very large.
>>
>> In fact they become so large that it is hard to read what the step is
>> actually doing.
>>
>> As an example I get log messages like this (This is 1 logline, I added
>> newlines for readability in this email).
>>
>> 2020-02-29 14:48:13,148 WARN org.apache.flink.metrics.MetricGroup - The
>> operator name
>> select: (EventTime, useragent,
>> ITEM(ParseUserAgent(useragent), _UTF-16LE'DeviceClass') AS DeviceClass,
>> ITEM(ParseUserAgent(useragent), _UTF-16LE'DeviceName') AS DeviceName,
>> ITEM(ParseUserAgent(useragent), _UTF-16LE'DeviceBrand') AS DeviceBrand,
>> ITEM(ParseUserAgent(useragent), _UTF-16LE'DeviceCpu') AS DeviceCpu,
>> ITEM(ParseUserAgent(useragent), _UTF-16LE'DeviceCpuBits') AS
>> DeviceCpuBits,
>> ITEM(ParseUserAgent(useragent), _UTF-16LE'DeviceVersion') AS
>> DeviceVersion,
>> ITEM(ParseUserAgent(useragent), _UTF-16LE'OperatingSystemClass') AS
>> OperatingSystemClass,
>> ITEM(ParseUserAgent(useragent), _UTF-16LE'OperatingSystemName') AS
>> OperatingSystemName,
>> ITEM(ParseUserAgent(useragent), _UTF-16LE'OperatingSystemNameVersion') AS
>> OperatingSystemNameVersion,
>> ITEM(ParseUserAgent(useragent), _UTF-16LE'LayoutEngineClass') AS
>> LayoutEngineClass,
>> ITEM(ParseUserAgent(useragent), _UTF-16LE'LayoutEngineName') AS
>> LayoutEngineName,
>> ITEM(ParseUserAgent(useragent), _UTF-16LE'LayoutEngineVersionMajor') AS
>> LayoutEngineVersionMajor,
>> ITEM(ParseUserAgent(useragent), _UTF-16LE'LayoutEngineNameVersionMajor')
>> AS LayoutEngineNameVersionMajor,
>> ITEM(ParseUserAgent(useragent), _UTF-16LE'AgentClass') AS AgentClass,
>> ITEM(ParseUserAgent(useragent), _UTF-16LE'AgentName') AS AgentName,
>> ITEM(ParseUserAgent(useragent), _UTF-16LE'AgentVersionMajor') AS
>> AgentVersionMajor,
>> ITEM(ParseUserAgent(useragent), _UTF-16LE'AgentNameVersionMajor') AS
>> AgentNameVersionMajor,
>> ITEM(ParseUserAgent(useragent), _UTF-16LE'AgentLanguage') AS
>> AgentLanguage,
>> ITEM(ParseUserAgent(useragent), _UTF-16LE'AgentLanguageCode') AS
>> AgentLanguageCode,
>> ITEM(ParseUserAgent(useragent), _UTF-16LE'AgentInformationEmail') AS
>> AgentInformationEmail,
>> ITEM(ParseUserAgent(useragent), _UTF-16LE'AgentInformationUrl') AS
>> AgentInformationUrl,
>> ITEM(ParseUserAgent(useragent), _UTF-16LE'AgentSecurity') AS
>> AgentSecurity,
>> ITEM(ParseUserAgent(useragent), _UTF-16LE'WebviewAppName') AS
>> WebviewAppName,
>> ITEM(ParseUserAgent(useragent), _UTF-16LE'WebviewAppNameVersionMajor') AS
>> WebviewAppNameVersionMajor,
>> ITEM(ParseUserAgent(useragent), _UTF-16LE'Anonymized') AS Anonymized,
>> ITEM(ParseUserAgent(useragent), _UTF-16LE'HackerAttackVector') AS
>> HackerAttackVector,
>> ITEM(ParseUserAgent(useragent), _UTF-16LE'HackerToolkit') AS
>> HackerToolkit,
>> ITEM(ParseUserAgent(useragent), _UTF-16LE'KoboAffiliate') AS
>> KoboAffiliate,
>> ITEM(ParseUserAgent(useragent), _UTF-16LE'KoboPlatformId') AS
>> KoboPlatformId,
>> ITEM(ParseUserAgent(useragent), _UTF-16LE
>> 'IECompatibilityNameVersionMajor') AS IECompatibilityNameVersionMajor,
>> ITEM(ParseUserAgent(useragent), _UTF-16LE'Carrier') AS Carrier,
>> ITEM(ParseUserAgent(useragent), _UTF-16LE'NetworkType') AS NetworkType,
>> clicks, visitors)
>> exceeded the 80 characters length limit and was truncated.
>>
>>
>> As you can see this impacts not only the names of the steps but also the
>> metrics.
>>
>> My question if it is possible to specify a name for the step, similar to
>> what I can do in the Java code?
>>
>> --
>> Best regards / Met vriendelijke groeten,
>>
>> Niels Basjes
>>
>>

-- 
Best regards / Met vriendelijke groeten,

Niels Basjes


Writing a DataSet to ElasticSearch

2020-03-01 Thread Niels Basjes
Hi,

I have a job in Flink 1.10.0 which creates data that I need to write to
ElasticSearch.
Because it really is a Batch (and doing it as a stream keeps giving OOM
problems: big + unordered + groupby) I'm trying to do it as a real batch.

To write a DataSet to some output (that is not a file) an OutputFormat
implementation is needed.

public DataSink output(OutputFormat outputFormat)

The problem I have is that I have not been able to find a "OutputFormat"
for ElasticSearch.
Adding ES as a Sink to a DataStream is trivial because a Sink is provided
out of the box.

The only alternative I came up with is to write the output of my batch to a
file and then load that (with a stream) into ES.

What is the proper solution?
Is there an OutputFormat for ES I can use that I overlooked?

-- 
Best regards / Met vriendelijke groeten,

Niels Basjes


Flink Weekly | 每周社区动态更新 - 2020/03/01

2020-03-01 Thread Jingsong Lee
大家好,本文为 Flink Weekly 的第七期,由李劲松整理,主要内容包括:近期社区开发进展,邮件问题答疑以及社区直播和相关技术博客。

社区开发进展

谢亚东增强Apache Flink Web
UI的提议[1]拆分成了7个子FLIP,这将大大增强UI的可用性,帮助我们排查问题,了解运行时信息。现在分别正在热火朝天的讨论和投票中,大家可以看下邮件中的Demo,每个子FLIP都有Demo例子来展示。

   -

   FLIP-98: 更好的反压检测 [2]
   -

   FLIP-99: 使得最大异常数可配置 [3]
   -

   FLIP-100: 添加Task等的重试信息 [4]
   -

   FLIP-101: 在作业详情页面添加PendingSlots的Tab [5]
   -

   FLIP-102: 添加更多的TaskManager Metrics [6]
   -

   FLIP-103: 更好的Taskmanager/Jobmanager日志展示 [7]
   -

   FLIP-104: 添加更多的Jobmanager Metrics [8]

更多信息请参考:

[1]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-75-Flink-Web-UI-Improvement-Proposal-td33540.html

[2]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/VOTE-FLIP-98-Better-Back-Pressure-Detection-td37893.html

[3]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/VOTE-FLIP-99-Make-Max-Exception-Configurable-tp37895.html

[4]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/VOTE-FLIP-100-Add-Attempt-Information-tp37896p37966.html

[5]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/VOTE-FLIP-101-Add-Pending-Slots-Detail-tp37897p37967.html

[6]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/VOTE-FLIP-102-Add-More-Metrics-to-TaskManager-tp37898.html

[7]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/VOTE-FLIP-103-Better-TM-JM-Log-Display-tp37899p38075.html

[8]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/VOTE-FLIP-104-Add-More-Metrics-to-Jobmanager-tp37901.html

Canbin
Zheng发起的Kubernetes的架构重构讨论正在进行中,希望引入一个统一的基于monadic-step的编排器架构,该架构对Kubernetes资源构建过程具有更好、更清晰和一致的抽象,适用于客户端和服务端。

[9]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLINK-16194-Refactor-the-Kubernetes-architecture-design-td37931.html

钟葳发起了在SQL DDL中支持Python
UDF的讨论,在1.10中,已经支持了UDF的DDL,但是只支持了Java/Scala的,这个讨论旨在支持Python UDF。

[10]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-106-Support-Python-UDF-in-SQL-Function-DDL-td38107.html

李钰和王治江回复了Unaligned
checkpoints的讨论,这个提议在于支持一种新的Checkpoint方式,它可以把Checkpoint的间隔大大缩短,减少流计算的E2E时间,也减少Failover的时间。

[11]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-76-Unaligned-checkpoints-td33651.html

李博闻发起了JDBC Catalog FLIP的投票,旨在用Catalog来对接JDBC,从而可以使用到外部数据库的表。

[12]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/VOTE-FLIP-93-JDBC-catalog-and-Postgres-catalog-td38208.html

贺小令发起了TableEnvironment接口重构FLIP的投票,旨在重构TableEnvironment的sqlUpdate等接口,提供更为清晰的sql接口,避免缓存SQL问题导致用户的困惑。

[13]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/VOTE-FLIP-84-Improve-amp-Refactor-API-of-TableEnvironment-td38178.html

邮件问题答疑

Outlook在用户邮件列表发出了关于Json格式解析Timestamp时的问题,目前Flink在Json解析时遵循了RFC
3339标准,但是这个标准可能不是用户常用的,用户可能有各种各样的Timestamp字符串形式,解法正在讨论中。

[14]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Re-TIME-TIMESTAMP-parse-in-Flink-TABLE-SQL-API-td38150.html

有两位用户都遇到了Class冲突的问题,这是因为Flink
1.10把客户端的ClassLoader解析顺序调整为了Child优先,这就导致用户的Jar包不能包含Flink框架的classes,比如常见的Calcite、Flink-Planner依赖、Hive依赖等等。用户需要把有冲突classes的jar放到flink-home/lib下,或者调整策略为Parent优先。

[15]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Flink-1-10-exception-Unable-to-instantiate-java-compiler-td38221.html

[16]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Flink-1-10-exception-Unable-to-instantiate-java-compiler-td38221.html

猫猫提出了flink-jdbc-driver的使用问题,引出了目前batch不支持UpsertTableSink,也就是不支持目前的JDBCUpsertSink和HBaseUpsertSink,目前正在支持中。

[17]
http://apache-flink.147419.n8.nabble.com/flink-jdbc-driver-mysql-flink1-10-0-td1763.html

claylin提出了Flink 1.10 RocksDB优化的问题,正在尝试通过内存和线程来解决。

[18]http://apache-flink.147419.n8.nabble.com/rocksDB-td1785.html

有两位用户都碰到了Flink 1.10 Hive集成的kerberos认证异常,问题还在排查中。

[19]
http://apache-flink.147419.n8.nabble.com/Flink-1-10-hive-kerberos-td1751.html

[20]
http://apache-flink.147419.n8.nabble.com/Hive-Source-With-Kerberos-td1688.html


活动博客文章及其他

Seth发布关于Apache Flink SQL DDL的博客文章“No Java Required: Configured Sources and
Sinks in SQL”。

[21]https://flink.apache.org/news/2020/02/20/ddl.html

Maximilian Michels和Markos Sfikas发布了Apache Beam和Apache Flink集成的博客文章:“Apache
Beam: How Beam Runs on Top of Flink”。

[22]
https://flink.apache.org/ecosystem/2020/02/22/apache-beam-how-beam-runs-on-top-of-flink.html


Best,

Jingsong Lee


Re: Do I need to use Table API or DataStream API to construct sources?

2020-03-01 Thread kant kodali
Hi Benchao,

That worked! Pasting the build.gradle file here. However this only works
for 0.11 and it needs zookeeper.connect() which shouldn't be required. not
sure why it is required in Flink Kafka connector?  If I change the version
to 2.2 in the code and specify this jar

flinkShadowJar "org.apache.flink:flink-sql-connector-kafka_2.11:${flinkVersion}"

or

flinkShadowJar 
"org.apache.flink:flink-sql-connector-kafka-0.11_2.11:${flinkVersion}"
//Not sure if I should use this one for Kafka >= 0.11

It doesn't work either.


buildscript {
repositories {
jcenter() // this applies only to the Gradle 'Shadow' plugin
}
dependencies {
classpath 'com.github.jengelman.gradle.plugins:shadow:2.0.4'
}
}

plugins {
id 'java'
id 'application'
}

mainClassName = 'Test'
apply plugin: 'com.github.johnrengelman.shadow'

ext {
javaVersion = '1.8'
flinkVersion = '1.10.0'
scalaBinaryVersion = '2.11'
slf4jVersion = '1.7.7'
log4jVersion = '1.2.17'
}


sourceCompatibility = javaVersion
targetCompatibility = javaVersion
tasks.withType(JavaCompile) {
options.encoding = 'UTF-8'
}

applicationDefaultJvmArgs = ["-Dlog4j.configuration=log4j.properties"]

// declare where to find the dependencies of your project
repositories {
mavenCentral()
maven { url
"https://repository.apache.org/content/repositories/snapshots/; }
}

// NOTE: We cannot use "compileOnly" or "shadow" configurations since
then we could not run code
// in the IDE or with "gradle run". We also cannot exclude transitive
dependencies from the
// shadowJar yet (see https://github.com/johnrengelman/shadow/issues/159).
// -> Explicitly define the // libraries we want to be included in the
"flinkShadowJar" configuration!
configurations {
flinkShadowJar // dependencies which go into the shadowJar

// always exclude these (also from transitive dependencies) since
they are provided by Flink
flinkShadowJar.exclude group: 'org.apache.flink', module: 'force-shading'
flinkShadowJar.exclude group: 'com.google.code.findbugs', module: 'jsr305'
flinkShadowJar.exclude group: 'org.slf4j'
flinkShadowJar.exclude group: 'log4j'
}

// declare the dependencies for your production and test code
dependencies {
// --
// Compile-time dependencies that should NOT be part of the
// shadow jar and are provided in the lib folder of Flink
// --
compile "org.apache.flink:flink-java:${flinkVersion}"
compile 
"org.apache.flink:flink-streaming-java_${scalaBinaryVersion}:${flinkVersion}"

// --
// Dependencies that should be part of the shadow jar, e.g.
// connectors. These must be in the flinkShadowJar configuration!
// --

compile "org.apache.flink:flink-java:${flinkVersion}"
compile 
"org.apache.flink:flink-streaming-java_${scalaBinaryVersion}:${flinkVersion}"
flinkShadowJar "org.apache.flink:flink-csv:${flinkVersion}"

// tried doesnt work. same problem
//flinkShadowJar
"org.apache.flink:flink-connector-kafka_2.11:${flinkVersion}"

// tried doesnt work. same problem
//flinkShadowJar
"org.apache.flink:flink-sql-connector-kafka_2.11:${flinkVersion}"

//tried doesnt work. same problem
flinkShadowJar
"org.apache.flink:flink-sql-connector-kafka-0.11_2.11:${flinkVersion}"

flinkShadowJar "org.apache.flink:flink-table-api-java:${flinkVersion}"
compileOnly
"org.apache.flink:flink-table-api-java-bridge_2.11:${flinkVersion}"
compileOnly
"org.apache.flink:flink-table-planner-blink_2.11:${flinkVersion}"
flinkShadowJar "org.apache.flink:flink-streaming-scala_2.11:${flinkVersion}"
flinkShadowJar
"org.apache.flink:flink-statebackend-rocksdb_2.11:${flinkVersion}"


compile "log4j:log4j:${log4jVersion}"
compile "org.slf4j:slf4j-log4j12:${slf4jVersion}"

// Add test dependencies here.
// testCompile "junit:junit:4.12"
testImplementation
"org.apache.flink:flink-table-api-java-bridge_2.11:${flinkVersion}"
testImplementation
"org.apache.flink:flink-table-planner-blink_2.11:${flinkVersion}"
}

// make compileOnly dependencies available for tests:
sourceSets {
main.compileClasspath += configurations.flinkShadowJar
main.runtimeClasspath += configurations.flinkShadowJar

test.compileClasspath += configurations.flinkShadowJar
test.runtimeClasspath += configurations.flinkShadowJar

javadoc.classpath += configurations.flinkShadowJar
}

run.classpath = sourceSets.main.runtimeClasspath

jar {
manifest {
attributes 'Built-By': System.getProperty('user.name'),
'Build-Jdk': System.getProperty('java.version')
}
}

shadowJar {
configurations = [project.configurations.flinkShadowJar]
mergeServiceFiles()
manifest {

Re:Re: Re: Flink SQL job failed to submit with enableCheckpointing while SQL contains UDF

2020-03-01 Thread sunfulin
Below is the code. The function trans origin field timeStr "2020-03-01 
12:01:00.234" to target timeStr accroding to dayTag.



public class ts2Date extends ScalarFunction {
public ts2Date() {

}




public String eval (String timeStr, boolean dayTag) {

if(timeStr == null) {
return null;
}
SimpleDateFormat ortSf = new SimpleDateFormat("-MM-dd HH:mm:ss.SSS");
Date date = new Date();
try {
date = ortSf.parse(timeStr);
} catch (ParseException e) {
e.printStackTrace();
return null;
}
if (dayTag) {
String format = "-MM-dd";
SimpleDateFormat sf = new SimpleDateFormat(format);
return sf.format(date);
} else {
String format = "-MM-dd\'T\'HH:mm:00.000+0800";
SimpleDateFormat sf = new SimpleDateFormat(format);
return sf.format(date);
}


}
}







At 2020-03-01 18:14:30, "Benchao Li"  wrote:

Could you show how your UDF `ts2Date` is implemented?


sunfulin  于2020年3月1日周日 下午6:05写道:

Hi, Benchao,
Thanks for the reply. 


Could you provide us more information?
1. what planner are you using? blink or legacy planner?
I am using Blink Planner. Not test with legacy planner because my program 
depend a lot of new feature based on blink planner.
2. how do you register your UDF?
Just use the code :  tableEnv.registerFunction ("ts2Date", new ts2Date());
tableEnv is a StreamTableEnvironment.
3. does this has a relation with checkpointing? what if you enable 
checkpointing and not use your udf? and disable checkpointing and use udf?
I don't think this is related with checkpoint. If I enable checkpointing and 
not use my udf, I did not see any exception and submit job successfully. If I 
disable checkpointing and use udf, the job can submit successfully too. 


I dive a lot with this exception. Maybe it is related with some classloader 
issue. Hope for your suggestion. 











在 2020-03-01 17:54:03,"Benchao Li"  写道:

Hi fulin,


It seems like a bug in the code generation.


Could you provide us more information?
1. what planner are you using? blink or legacy planner?
2. how do you register your UDF?
3. does this has a relation with checkpointing? what if you enable 
checkpointing and not use your udf? and disable checkpointing and use udf?


sunfulin  于2020年3月1日周日 下午5:41写道:

Hi, guys
I am running Flink 1.10 SQL job with UpsertStreamSink to ElasticSearch. In my 
sql logic, I am using a UDF like ts2Date to handle date format stream fields. 
However, when I add the `env.enableCheckpointing(time)`, my job failed to 
submit and throws exception like following. This is really weird, cause when I 
remove the UDF, the job can submit successfully. Any suggestion is highly 
appreciated. Besides, my sql logic is like : 



INSERT INTO realtime_product_sell
select U.sor_pty_id,
   U.entrust_date,
   U.entrust_time,
   U.product_code,
   U.business_type,
   sum(cast(U.balance as double)) as balance,
   COALESCE(C.cust_name, '--') as cust_name,
   COALESCE(C.open_comp_name, '--') AS open_comp_name,
   COALESCE(C.open_comp_id, '--') as open_comp_id,
   COALESCE(C.org_name,'--') as org_name,
   COALESCE(C.org_id,'--') as comp_name,
   COALESCE(C.comp_name, '--') AS comp_name,
   COALESCE(C.comp_id,'--') as comp_id,
   COALESCE(C.mng_name,'--') as mng_name,
   COALESCE(C.mng_id,'--') as mng_id,
   COALESCE(C.is_tg,'--') as is_tg,
   COALESCE(C.cust_type,'--') as cust_type,
   COALESCE(C.avg_tot_aset_y365, 0.00) as avg_tot_aset_y365,
   COALESCE(C.avg_aset_create_y, 0.00) as avg_aset_create_y
from
(select customerNumber as sor_pty_id,
ts2Date(`lastUpdateTime`, true) as entrust_date, -- the UDF 
   ts2Date(`lastUpdateTime`, false) as entrust_time, -- the UDF
fundCode as product_code,
businessType as business_type,
balance,
proctime
  from lscsp_sc_order_all where fundCode in ('007118','007117') and 
businessType in ('5') ) as U
left join
dim_app_cust_info FOR SYSTEM_TIME AS OF U.proctime AS C
on U.sor_pty_id = C.cust_id
group by sor_pty_id,
entrust_date,
entrust_time,
product_code,
business_type,
COALESCE(C.cust_name, '--'),
COALESCE(C.open_comp_name, '--'),
COALESCE(C.open_comp_id, '--'),
COALESCE(C.org_name,'--'),
COALESCE(C.org_id,'--'),
COALESCE(C.comp_name, '--'),
COALESCE(C.comp_id,'--'),
COALESCE(C.mng_name,'--'),
COALESCE(C.mng_id,'--'),
COALESCE(C.is_tg,'--'),
COALESCE(C.cust_type,'--'),
COALESCE(C.avg_tot_aset_y365, 0.00),
COALESCE(C.avg_aset_create_y, 0.00)




2020-03-01 17:22:06,504 ERROR 
org.apache.flink.runtime.webmonitor.handlers.JarRunHandler- Unhandled 
exception.
org.apache.flink.util.FlinkRuntimeException: 
org.apache.flink.api.common.InvalidProgramException: Table program cannot be 
compiled. This is a bug. 

Re:Re: Re: Flink SQL job failed to submit with enableCheckpointing while SQL contains UDF

2020-03-01 Thread sunfulin
Below is the code. The function trans origin field timeStr "2020-03-01 
12:01:00.234" to target timeStr accroding to dayTag.



public class ts2Date extends ScalarFunction {
public ts2Date() {

}




public String eval (String timeStr, boolean dayTag) {

if(timeStr == null) {
return null;
}
SimpleDateFormat ortSf = new SimpleDateFormat("-MM-dd HH:mm:ss.SSS");
Date date = new Date();
try {
date = ortSf.parse(timeStr);
} catch (ParseException e) {
e.printStackTrace();
return null;
}
if (dayTag) {
String format = "-MM-dd";
SimpleDateFormat sf = new SimpleDateFormat(format);
return sf.format(date);
} else {
String format = "-MM-dd\'T\'HH:mm:00.000+0800";
SimpleDateFormat sf = new SimpleDateFormat(format);
return sf.format(date);
}


}
}







At 2020-03-01 18:14:30, "Benchao Li"  wrote:

Could you show how your UDF `ts2Date` is implemented?


sunfulin  于2020年3月1日周日 下午6:05写道:

Hi, Benchao,
Thanks for the reply. 


Could you provide us more information?
1. what planner are you using? blink or legacy planner?
I am using Blink Planner. Not test with legacy planner because my program 
depend a lot of new feature based on blink planner.
2. how do you register your UDF?
Just use the code :  tableEnv.registerFunction ("ts2Date", new ts2Date());
tableEnv is a StreamTableEnvironment.
3. does this has a relation with checkpointing? what if you enable 
checkpointing and not use your udf? and disable checkpointing and use udf?
I don't think this is related with checkpoint. If I enable checkpointing and 
not use my udf, I did not see any exception and submit job successfully. If I 
disable checkpointing and use udf, the job can submit successfully too. 


I dive a lot with this exception. Maybe it is related with some classloader 
issue. Hope for your suggestion. 











在 2020-03-01 17:54:03,"Benchao Li"  写道:

Hi fulin,


It seems like a bug in the code generation.


Could you provide us more information?
1. what planner are you using? blink or legacy planner?
2. how do you register your UDF?
3. does this has a relation with checkpointing? what if you enable 
checkpointing and not use your udf? and disable checkpointing and use udf?


sunfulin  于2020年3月1日周日 下午5:41写道:

Hi, guys
I am running Flink 1.10 SQL job with UpsertStreamSink to ElasticSearch. In my 
sql logic, I am using a UDF like ts2Date to handle date format stream fields. 
However, when I add the `env.enableCheckpointing(time)`, my job failed to 
submit and throws exception like following. This is really weird, cause when I 
remove the UDF, the job can submit successfully. Any suggestion is highly 
appreciated. Besides, my sql logic is like : 



INSERT INTO realtime_product_sell
select U.sor_pty_id,
   U.entrust_date,
   U.entrust_time,
   U.product_code,
   U.business_type,
   sum(cast(U.balance as double)) as balance,
   COALESCE(C.cust_name, '--') as cust_name,
   COALESCE(C.open_comp_name, '--') AS open_comp_name,
   COALESCE(C.open_comp_id, '--') as open_comp_id,
   COALESCE(C.org_name,'--') as org_name,
   COALESCE(C.org_id,'--') as comp_name,
   COALESCE(C.comp_name, '--') AS comp_name,
   COALESCE(C.comp_id,'--') as comp_id,
   COALESCE(C.mng_name,'--') as mng_name,
   COALESCE(C.mng_id,'--') as mng_id,
   COALESCE(C.is_tg,'--') as is_tg,
   COALESCE(C.cust_type,'--') as cust_type,
   COALESCE(C.avg_tot_aset_y365, 0.00) as avg_tot_aset_y365,
   COALESCE(C.avg_aset_create_y, 0.00) as avg_aset_create_y
from
(select customerNumber as sor_pty_id,
ts2Date(`lastUpdateTime`, true) as entrust_date, -- the UDF 
   ts2Date(`lastUpdateTime`, false) as entrust_time, -- the UDF
fundCode as product_code,
businessType as business_type,
balance,
proctime
  from lscsp_sc_order_all where fundCode in ('007118','007117') and 
businessType in ('5') ) as U
left join
dim_app_cust_info FOR SYSTEM_TIME AS OF U.proctime AS C
on U.sor_pty_id = C.cust_id
group by sor_pty_id,
entrust_date,
entrust_time,
product_code,
business_type,
COALESCE(C.cust_name, '--'),
COALESCE(C.open_comp_name, '--'),
COALESCE(C.open_comp_id, '--'),
COALESCE(C.org_name,'--'),
COALESCE(C.org_id,'--'),
COALESCE(C.comp_name, '--'),
COALESCE(C.comp_id,'--'),
COALESCE(C.mng_name,'--'),
COALESCE(C.mng_id,'--'),
COALESCE(C.is_tg,'--'),
COALESCE(C.cust_type,'--'),
COALESCE(C.avg_tot_aset_y365, 0.00),
COALESCE(C.avg_aset_create_y, 0.00)




2020-03-01 17:22:06,504 ERROR 
org.apache.flink.runtime.webmonitor.handlers.JarRunHandler- Unhandled 
exception.
org.apache.flink.util.FlinkRuntimeException: 
org.apache.flink.api.common.InvalidProgramException: Table program cannot be 
compiled. This is a bug. 

Re: Re: Flink SQL job failed to submit with enableCheckpointing while SQL contains UDF

2020-03-01 Thread Benchao Li
Could you show how your UDF `ts2Date` is implemented?

sunfulin  于2020年3月1日周日 下午6:05写道:

> Hi, Benchao,
> Thanks for the reply.
>
> Could you provide us more information?
> 1. what planner are you using? blink or legacy planner?
> I am using Blink Planner. Not test with legacy planner because my program
> depend a lot of new feature based on blink planner.
> 2. how do you register your UDF?
> Just use the code :  tableEnv.registerFunction ("ts2Date", new
> ts2Date());tableEnv is a StreamTableEnvironment.
> 3. does this has a relation with checkpointing? what if you enable
> checkpointing and not use your udf? and disable checkpointing and use udf?
> I don't think this is related with checkpoint. If I enable checkpointing
> and not use my udf, I did not see any exception and submit job
> successfully. If I disable checkpointing and use udf, the job can submit
> successfully too.
>
> I dive a lot with this exception. Maybe it is related with some
> classloader issue. Hope for your suggestion.
>
>
>
>
>
> 在 2020-03-01 17:54:03,"Benchao Li"  写道:
>
> Hi fulin,
>
> It seems like a bug in the code generation.
>
> Could you provide us more information?
> 1. what planner are you using? blink or legacy planner?
> 2. how do you register your UDF?
> 3. does this has a relation with checkpointing? what if you enable
> checkpointing and not use your udf? and disable checkpointing and use udf?
>
> sunfulin  于2020年3月1日周日 下午5:41写道:
>
>> Hi, guys
>> I am running Flink 1.10 SQL job with UpsertStreamSink to ElasticSearch.
>> In my sql logic, I am using a UDF like ts2Date to handle date format stream
>> fields. However, when I add the `env.enableCheckpointing(time)`, my job
>> failed to submit and throws exception like following. This is really weird,
>> cause when I remove the UDF, the job can submit successfully. Any
>> suggestion is highly appreciated. Besides, my sql logic is like :
>>
>> *INSERT INTO *realtime_product_sell
>> *select *U.sor_pty_id,
>>U.entrust_date,
>>U.entrust_time,
>>U.product_code,
>>U.business_type,
>>sum(*cast*(U.balance *as double*)) *as *balance,
>>COALESCE(C.cust_name, *'--'*) *as *cust_name,
>>COALESCE(C.open_comp_name, *'--'*) *AS *open_comp_name,
>>COALESCE(C.open_comp_id, *'--'*) *as *open_comp_id,
>>COALESCE(C.org_name,*'--'*) *as *org_name,
>>COALESCE(C.org_id,*'--'*) *as *comp_name,
>>COALESCE(C.comp_name, *'--'*) *AS *comp_name,
>>COALESCE(C.comp_id,*'--'*) *as *comp_id,
>>COALESCE(C.mng_name,*'--'*) *as *mng_name,
>>COALESCE(C.mng_id,*'--'*) *as *mng_id,
>>COALESCE(C.is_tg,*'--'*) *as *is_tg,
>>COALESCE(C.cust_type,*'--'*) *as *cust_type,
>>COALESCE(C.avg_tot_aset_y365, 0.00) *as *avg_tot_aset_y365,
>>COALESCE(C.avg_aset_create_y, 0.00) *as *avg_aset_create_y
>>
>> *from*(*select *customerNumber *as *sor_pty_id,
>> ts2Date(`lastUpdateTime`, *true*) *as *entrust_date, -- the
>> UDF
>>ts2Date(`lastUpdateTime`, *false*) *as *entrust_time, -- the
>> UDF
>> fundCode *as *product_code,
>> businessType *as *business_type,
>> balance,
>> proctime
>>   *from **lscsp_sc_order_all **where *fundCode *in *(*'007118'*,
>> *'007117'*) *and *businessType *in *(*'5'*) ) *as *U
>>
>> *left join**dim_app_cust_info **FOR *SYSTEM_TIME *AS OF *U.proctime *AS *
>> C
>> *on **U*.sor_pty_id = *C*.cust_id
>> *group by *sor_pty_id,
>> entrust_date,
>> entrust_time,
>> product_code,
>> business_type,
>> COALESCE(C.cust_name, *'--'*),
>> COALESCE(C.open_comp_name, *'--'*),
>> COALESCE(C.open_comp_id, *'--'*),
>> COALESCE(C.org_name,*'--'*),
>> COALESCE(C.org_id,*'--'*),
>> COALESCE(C.comp_name, *'--'*),
>> COALESCE(C.comp_id,*'--'*),
>> COALESCE(C.mng_name,*'--'*),
>> COALESCE(C.mng_id,*'--'*),
>> COALESCE(C.is_tg,*'--'*),
>> COALESCE(C.cust_type,*'--'*),
>> COALESCE(C.avg_tot_aset_y365, 0.00),
>> COALESCE(C.avg_aset_create_y, 0.00)
>>
>> 2020-03-01 17:22:06,504 ERROR
>> org.apache.flink.runtime.webmonitor.handlers.JarRunHandler- Unhandled
>> exception.
>> org.apache.flink.util.FlinkRuntimeException:
>> org.apache.flink.api.common.InvalidProgramException: Table program cannot
>> be compiled. This is a bug. Please file an issue.
>> at
>> org.apache.flink.table.runtime.generated.CompileUtils.compile(CompileUtils.java:68)
>> at
>> org.apache.flink.table.runtime.generated.GeneratedClass.compile(GeneratedClass.java:78)
>> at
>> org.apache.flink.table.runtime.generated.GeneratedClass.getClass(GeneratedClass.java:96)
>> at
>> org.apache.flink.table.runtime.operators.CodeGenOperatorFactory.getStreamOperatorClass(CodeGenOperatorFactory.java:62)
>> at
>> org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.preValidate(StreamingJobGraphGenerator.java:214)
>> at
>> 

Re: Re: Flink SQL job failed to submit with enableCheckpointing while SQL contains UDF

2020-03-01 Thread Benchao Li
Could you show how your UDF `ts2Date` is implemented?

sunfulin  于2020年3月1日周日 下午6:05写道:

> Hi, Benchao,
> Thanks for the reply.
>
> Could you provide us more information?
> 1. what planner are you using? blink or legacy planner?
> I am using Blink Planner. Not test with legacy planner because my program
> depend a lot of new feature based on blink planner.
> 2. how do you register your UDF?
> Just use the code :  tableEnv.registerFunction ("ts2Date", new
> ts2Date());tableEnv is a StreamTableEnvironment.
> 3. does this has a relation with checkpointing? what if you enable
> checkpointing and not use your udf? and disable checkpointing and use udf?
> I don't think this is related with checkpoint. If I enable checkpointing
> and not use my udf, I did not see any exception and submit job
> successfully. If I disable checkpointing and use udf, the job can submit
> successfully too.
>
> I dive a lot with this exception. Maybe it is related with some
> classloader issue. Hope for your suggestion.
>
>
>
>
>
> 在 2020-03-01 17:54:03,"Benchao Li"  写道:
>
> Hi fulin,
>
> It seems like a bug in the code generation.
>
> Could you provide us more information?
> 1. what planner are you using? blink or legacy planner?
> 2. how do you register your UDF?
> 3. does this has a relation with checkpointing? what if you enable
> checkpointing and not use your udf? and disable checkpointing and use udf?
>
> sunfulin  于2020年3月1日周日 下午5:41写道:
>
>> Hi, guys
>> I am running Flink 1.10 SQL job with UpsertStreamSink to ElasticSearch.
>> In my sql logic, I am using a UDF like ts2Date to handle date format stream
>> fields. However, when I add the `env.enableCheckpointing(time)`, my job
>> failed to submit and throws exception like following. This is really weird,
>> cause when I remove the UDF, the job can submit successfully. Any
>> suggestion is highly appreciated. Besides, my sql logic is like :
>>
>> *INSERT INTO *realtime_product_sell
>> *select *U.sor_pty_id,
>>U.entrust_date,
>>U.entrust_time,
>>U.product_code,
>>U.business_type,
>>sum(*cast*(U.balance *as double*)) *as *balance,
>>COALESCE(C.cust_name, *'--'*) *as *cust_name,
>>COALESCE(C.open_comp_name, *'--'*) *AS *open_comp_name,
>>COALESCE(C.open_comp_id, *'--'*) *as *open_comp_id,
>>COALESCE(C.org_name,*'--'*) *as *org_name,
>>COALESCE(C.org_id,*'--'*) *as *comp_name,
>>COALESCE(C.comp_name, *'--'*) *AS *comp_name,
>>COALESCE(C.comp_id,*'--'*) *as *comp_id,
>>COALESCE(C.mng_name,*'--'*) *as *mng_name,
>>COALESCE(C.mng_id,*'--'*) *as *mng_id,
>>COALESCE(C.is_tg,*'--'*) *as *is_tg,
>>COALESCE(C.cust_type,*'--'*) *as *cust_type,
>>COALESCE(C.avg_tot_aset_y365, 0.00) *as *avg_tot_aset_y365,
>>COALESCE(C.avg_aset_create_y, 0.00) *as *avg_aset_create_y
>>
>> *from*(*select *customerNumber *as *sor_pty_id,
>> ts2Date(`lastUpdateTime`, *true*) *as *entrust_date, -- the
>> UDF
>>ts2Date(`lastUpdateTime`, *false*) *as *entrust_time, -- the
>> UDF
>> fundCode *as *product_code,
>> businessType *as *business_type,
>> balance,
>> proctime
>>   *from **lscsp_sc_order_all **where *fundCode *in *(*'007118'*,
>> *'007117'*) *and *businessType *in *(*'5'*) ) *as *U
>>
>> *left join**dim_app_cust_info **FOR *SYSTEM_TIME *AS OF *U.proctime *AS *
>> C
>> *on **U*.sor_pty_id = *C*.cust_id
>> *group by *sor_pty_id,
>> entrust_date,
>> entrust_time,
>> product_code,
>> business_type,
>> COALESCE(C.cust_name, *'--'*),
>> COALESCE(C.open_comp_name, *'--'*),
>> COALESCE(C.open_comp_id, *'--'*),
>> COALESCE(C.org_name,*'--'*),
>> COALESCE(C.org_id,*'--'*),
>> COALESCE(C.comp_name, *'--'*),
>> COALESCE(C.comp_id,*'--'*),
>> COALESCE(C.mng_name,*'--'*),
>> COALESCE(C.mng_id,*'--'*),
>> COALESCE(C.is_tg,*'--'*),
>> COALESCE(C.cust_type,*'--'*),
>> COALESCE(C.avg_tot_aset_y365, 0.00),
>> COALESCE(C.avg_aset_create_y, 0.00)
>>
>> 2020-03-01 17:22:06,504 ERROR
>> org.apache.flink.runtime.webmonitor.handlers.JarRunHandler- Unhandled
>> exception.
>> org.apache.flink.util.FlinkRuntimeException:
>> org.apache.flink.api.common.InvalidProgramException: Table program cannot
>> be compiled. This is a bug. Please file an issue.
>> at
>> org.apache.flink.table.runtime.generated.CompileUtils.compile(CompileUtils.java:68)
>> at
>> org.apache.flink.table.runtime.generated.GeneratedClass.compile(GeneratedClass.java:78)
>> at
>> org.apache.flink.table.runtime.generated.GeneratedClass.getClass(GeneratedClass.java:96)
>> at
>> org.apache.flink.table.runtime.operators.CodeGenOperatorFactory.getStreamOperatorClass(CodeGenOperatorFactory.java:62)
>> at
>> org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.preValidate(StreamingJobGraphGenerator.java:214)
>> at
>> 

Re:Re: Flink SQL job failed to submit with enableCheckpointing while SQL contains UDF

2020-03-01 Thread sunfulin
Hi, Benchao,
Thanks for the reply. 


Could you provide us more information?
1. what planner are you using? blink or legacy planner?
I am using Blink Planner. Not test with legacy planner because my program 
depend a lot of new feature based on blink planner.
2. how do you register your UDF?
Just use the code :  tableEnv.registerFunction ("ts2Date", new ts2Date());
tableEnv is a StreamTableEnvironment.
3. does this has a relation with checkpointing? what if you enable 
checkpointing and not use your udf? and disable checkpointing and use udf?
I don't think this is related with checkpoint. If I enable checkpointing and 
not use my udf, I did not see any exception and submit job successfully. If I 
disable checkpointing and use udf, the job can submit successfully too. 


I dive a lot with this exception. Maybe it is related with some classloader 
issue. Hope for your suggestion. 











在 2020-03-01 17:54:03,"Benchao Li"  写道:

Hi fulin,


It seems like a bug in the code generation.


Could you provide us more information?
1. what planner are you using? blink or legacy planner?
2. how do you register your UDF?
3. does this has a relation with checkpointing? what if you enable 
checkpointing and not use your udf? and disable checkpointing and use udf?


sunfulin  于2020年3月1日周日 下午5:41写道:

Hi, guys
I am running Flink 1.10 SQL job with UpsertStreamSink to ElasticSearch. In my 
sql logic, I am using a UDF like ts2Date to handle date format stream fields. 
However, when I add the `env.enableCheckpointing(time)`, my job failed to 
submit and throws exception like following. This is really weird, cause when I 
remove the UDF, the job can submit successfully. Any suggestion is highly 
appreciated. Besides, my sql logic is like : 



INSERT INTO realtime_product_sell
select U.sor_pty_id,
   U.entrust_date,
   U.entrust_time,
   U.product_code,
   U.business_type,
   sum(cast(U.balance as double)) as balance,
   COALESCE(C.cust_name, '--') as cust_name,
   COALESCE(C.open_comp_name, '--') AS open_comp_name,
   COALESCE(C.open_comp_id, '--') as open_comp_id,
   COALESCE(C.org_name,'--') as org_name,
   COALESCE(C.org_id,'--') as comp_name,
   COALESCE(C.comp_name, '--') AS comp_name,
   COALESCE(C.comp_id,'--') as comp_id,
   COALESCE(C.mng_name,'--') as mng_name,
   COALESCE(C.mng_id,'--') as mng_id,
   COALESCE(C.is_tg,'--') as is_tg,
   COALESCE(C.cust_type,'--') as cust_type,
   COALESCE(C.avg_tot_aset_y365, 0.00) as avg_tot_aset_y365,
   COALESCE(C.avg_aset_create_y, 0.00) as avg_aset_create_y
from
(select customerNumber as sor_pty_id,
ts2Date(`lastUpdateTime`, true) as entrust_date, -- the UDF 
   ts2Date(`lastUpdateTime`, false) as entrust_time, -- the UDF
fundCode as product_code,
businessType as business_type,
balance,
proctime
  from lscsp_sc_order_all where fundCode in ('007118','007117') and 
businessType in ('5') ) as U
left join
dim_app_cust_info FOR SYSTEM_TIME AS OF U.proctime AS C
on U.sor_pty_id = C.cust_id
group by sor_pty_id,
entrust_date,
entrust_time,
product_code,
business_type,
COALESCE(C.cust_name, '--'),
COALESCE(C.open_comp_name, '--'),
COALESCE(C.open_comp_id, '--'),
COALESCE(C.org_name,'--'),
COALESCE(C.org_id,'--'),
COALESCE(C.comp_name, '--'),
COALESCE(C.comp_id,'--'),
COALESCE(C.mng_name,'--'),
COALESCE(C.mng_id,'--'),
COALESCE(C.is_tg,'--'),
COALESCE(C.cust_type,'--'),
COALESCE(C.avg_tot_aset_y365, 0.00),
COALESCE(C.avg_aset_create_y, 0.00)




2020-03-01 17:22:06,504 ERROR 
org.apache.flink.runtime.webmonitor.handlers.JarRunHandler- Unhandled 
exception.
org.apache.flink.util.FlinkRuntimeException: 
org.apache.flink.api.common.InvalidProgramException: Table program cannot be 
compiled. This is a bug. Please file an issue.
at 
org.apache.flink.table.runtime.generated.CompileUtils.compile(CompileUtils.java:68)
at 
org.apache.flink.table.runtime.generated.GeneratedClass.compile(GeneratedClass.java:78)
at 
org.apache.flink.table.runtime.generated.GeneratedClass.getClass(GeneratedClass.java:96)
at 
org.apache.flink.table.runtime.operators.CodeGenOperatorFactory.getStreamOperatorClass(CodeGenOperatorFactory.java:62)
at 
org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.preValidate(StreamingJobGraphGenerator.java:214)
at 
org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createJobGraph(StreamingJobGraphGenerator.java:149)
at 
org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createJobGraph(StreamingJobGraphGenerator.java:104)
at 
org.apache.flink.streaming.api.graph.StreamGraph.getJobGraph(StreamGraph.java:777)
at 
org.apache.flink.streaming.api.graph.StreamGraphTranslator.translateToJobGraph(StreamGraphTranslator.java:52)
at 

Re:Re: Flink SQL job failed to submit with enableCheckpointing while SQL contains UDF

2020-03-01 Thread sunfulin
Hi, Benchao,
Thanks for the reply. 


Could you provide us more information?
1. what planner are you using? blink or legacy planner?
I am using Blink Planner. Not test with legacy planner because my program 
depend a lot of new feature based on blink planner.
2. how do you register your UDF?
Just use the code :  tableEnv.registerFunction ("ts2Date", new ts2Date());
tableEnv is a StreamTableEnvironment.
3. does this has a relation with checkpointing? what if you enable 
checkpointing and not use your udf? and disable checkpointing and use udf?
I don't think this is related with checkpoint. If I enable checkpointing and 
not use my udf, I did not see any exception and submit job successfully. If I 
disable checkpointing and use udf, the job can submit successfully too. 


I dive a lot with this exception. Maybe it is related with some classloader 
issue. Hope for your suggestion. 











在 2020-03-01 17:54:03,"Benchao Li"  写道:

Hi fulin,


It seems like a bug in the code generation.


Could you provide us more information?
1. what planner are you using? blink or legacy planner?
2. how do you register your UDF?
3. does this has a relation with checkpointing? what if you enable 
checkpointing and not use your udf? and disable checkpointing and use udf?


sunfulin  于2020年3月1日周日 下午5:41写道:

Hi, guys
I am running Flink 1.10 SQL job with UpsertStreamSink to ElasticSearch. In my 
sql logic, I am using a UDF like ts2Date to handle date format stream fields. 
However, when I add the `env.enableCheckpointing(time)`, my job failed to 
submit and throws exception like following. This is really weird, cause when I 
remove the UDF, the job can submit successfully. Any suggestion is highly 
appreciated. Besides, my sql logic is like : 



INSERT INTO realtime_product_sell
select U.sor_pty_id,
   U.entrust_date,
   U.entrust_time,
   U.product_code,
   U.business_type,
   sum(cast(U.balance as double)) as balance,
   COALESCE(C.cust_name, '--') as cust_name,
   COALESCE(C.open_comp_name, '--') AS open_comp_name,
   COALESCE(C.open_comp_id, '--') as open_comp_id,
   COALESCE(C.org_name,'--') as org_name,
   COALESCE(C.org_id,'--') as comp_name,
   COALESCE(C.comp_name, '--') AS comp_name,
   COALESCE(C.comp_id,'--') as comp_id,
   COALESCE(C.mng_name,'--') as mng_name,
   COALESCE(C.mng_id,'--') as mng_id,
   COALESCE(C.is_tg,'--') as is_tg,
   COALESCE(C.cust_type,'--') as cust_type,
   COALESCE(C.avg_tot_aset_y365, 0.00) as avg_tot_aset_y365,
   COALESCE(C.avg_aset_create_y, 0.00) as avg_aset_create_y
from
(select customerNumber as sor_pty_id,
ts2Date(`lastUpdateTime`, true) as entrust_date, -- the UDF 
   ts2Date(`lastUpdateTime`, false) as entrust_time, -- the UDF
fundCode as product_code,
businessType as business_type,
balance,
proctime
  from lscsp_sc_order_all where fundCode in ('007118','007117') and 
businessType in ('5') ) as U
left join
dim_app_cust_info FOR SYSTEM_TIME AS OF U.proctime AS C
on U.sor_pty_id = C.cust_id
group by sor_pty_id,
entrust_date,
entrust_time,
product_code,
business_type,
COALESCE(C.cust_name, '--'),
COALESCE(C.open_comp_name, '--'),
COALESCE(C.open_comp_id, '--'),
COALESCE(C.org_name,'--'),
COALESCE(C.org_id,'--'),
COALESCE(C.comp_name, '--'),
COALESCE(C.comp_id,'--'),
COALESCE(C.mng_name,'--'),
COALESCE(C.mng_id,'--'),
COALESCE(C.is_tg,'--'),
COALESCE(C.cust_type,'--'),
COALESCE(C.avg_tot_aset_y365, 0.00),
COALESCE(C.avg_aset_create_y, 0.00)




2020-03-01 17:22:06,504 ERROR 
org.apache.flink.runtime.webmonitor.handlers.JarRunHandler- Unhandled 
exception.
org.apache.flink.util.FlinkRuntimeException: 
org.apache.flink.api.common.InvalidProgramException: Table program cannot be 
compiled. This is a bug. Please file an issue.
at 
org.apache.flink.table.runtime.generated.CompileUtils.compile(CompileUtils.java:68)
at 
org.apache.flink.table.runtime.generated.GeneratedClass.compile(GeneratedClass.java:78)
at 
org.apache.flink.table.runtime.generated.GeneratedClass.getClass(GeneratedClass.java:96)
at 
org.apache.flink.table.runtime.operators.CodeGenOperatorFactory.getStreamOperatorClass(CodeGenOperatorFactory.java:62)
at 
org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.preValidate(StreamingJobGraphGenerator.java:214)
at 
org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createJobGraph(StreamingJobGraphGenerator.java:149)
at 
org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createJobGraph(StreamingJobGraphGenerator.java:104)
at 
org.apache.flink.streaming.api.graph.StreamGraph.getJobGraph(StreamGraph.java:777)
at 
org.apache.flink.streaming.api.graph.StreamGraphTranslator.translateToJobGraph(StreamGraphTranslator.java:52)
at 

Re: Flink SQL job failed to submit with enableCheckpointing while SQL contains UDF

2020-03-01 Thread Benchao Li
Hi fulin,

It seems like a bug in the code generation.

Could you provide us more information?
1. what planner are you using? blink or legacy planner?
2. how do you register your UDF?
3. does this has a relation with checkpointing? what if you enable
checkpointing and not use your udf? and disable checkpointing and use udf?

sunfulin  于2020年3月1日周日 下午5:41写道:

> Hi, guys
> I am running Flink 1.10 SQL job with UpsertStreamSink to ElasticSearch. In
> my sql logic, I am using a UDF like ts2Date to handle date format stream
> fields. However, when I add the `env.enableCheckpointing(time)`, my job
> failed to submit and throws exception like following. This is really weird,
> cause when I remove the UDF, the job can submit successfully. Any
> suggestion is highly appreciated. Besides, my sql logic is like :
>
> *INSERT INTO *realtime_product_sell
> *select *U.sor_pty_id,
>U.entrust_date,
>U.entrust_time,
>U.product_code,
>U.business_type,
>sum(*cast*(U.balance *as double*)) *as *balance,
>COALESCE(C.cust_name, *'--'*) *as *cust_name,
>COALESCE(C.open_comp_name, *'--'*) *AS *open_comp_name,
>COALESCE(C.open_comp_id, *'--'*) *as *open_comp_id,
>COALESCE(C.org_name,*'--'*) *as *org_name,
>COALESCE(C.org_id,*'--'*) *as *comp_name,
>COALESCE(C.comp_name, *'--'*) *AS *comp_name,
>COALESCE(C.comp_id,*'--'*) *as *comp_id,
>COALESCE(C.mng_name,*'--'*) *as *mng_name,
>COALESCE(C.mng_id,*'--'*) *as *mng_id,
>COALESCE(C.is_tg,*'--'*) *as *is_tg,
>COALESCE(C.cust_type,*'--'*) *as *cust_type,
>COALESCE(C.avg_tot_aset_y365, 0.00) *as *avg_tot_aset_y365,
>COALESCE(C.avg_aset_create_y, 0.00) *as *avg_aset_create_y
>
> *from*(*select *customerNumber *as *sor_pty_id,
> ts2Date(`lastUpdateTime`, *true*) *as *entrust_date, -- the
> UDF
>ts2Date(`lastUpdateTime`, *false*) *as *entrust_time, -- the
> UDF
> fundCode *as *product_code,
> businessType *as *business_type,
> balance,
> proctime
>   *from **lscsp_sc_order_all **where *fundCode *in *(*'007118'*,
> *'007117'*) *and *businessType *in *(*'5'*) ) *as *U
>
> *left join**dim_app_cust_info **FOR *SYSTEM_TIME *AS OF *U.proctime *AS *C
> *on **U*.sor_pty_id = *C*.cust_id
> *group by *sor_pty_id,
> entrust_date,
> entrust_time,
> product_code,
> business_type,
> COALESCE(C.cust_name, *'--'*),
> COALESCE(C.open_comp_name, *'--'*),
> COALESCE(C.open_comp_id, *'--'*),
> COALESCE(C.org_name,*'--'*),
> COALESCE(C.org_id,*'--'*),
> COALESCE(C.comp_name, *'--'*),
> COALESCE(C.comp_id,*'--'*),
> COALESCE(C.mng_name,*'--'*),
> COALESCE(C.mng_id,*'--'*),
> COALESCE(C.is_tg,*'--'*),
> COALESCE(C.cust_type,*'--'*),
> COALESCE(C.avg_tot_aset_y365, 0.00),
> COALESCE(C.avg_aset_create_y, 0.00)
>
> 2020-03-01 17:22:06,504 ERROR
> org.apache.flink.runtime.webmonitor.handlers.JarRunHandler- Unhandled
> exception.
> org.apache.flink.util.FlinkRuntimeException:
> org.apache.flink.api.common.InvalidProgramException: Table program cannot
> be compiled. This is a bug. Please file an issue.
> at
> org.apache.flink.table.runtime.generated.CompileUtils.compile(CompileUtils.java:68)
> at
> org.apache.flink.table.runtime.generated.GeneratedClass.compile(GeneratedClass.java:78)
> at
> org.apache.flink.table.runtime.generated.GeneratedClass.getClass(GeneratedClass.java:96)
> at
> org.apache.flink.table.runtime.operators.CodeGenOperatorFactory.getStreamOperatorClass(CodeGenOperatorFactory.java:62)
> at
> org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.preValidate(StreamingJobGraphGenerator.java:214)
> at
> org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createJobGraph(StreamingJobGraphGenerator.java:149)
> at
> org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createJobGraph(StreamingJobGraphGenerator.java:104)
> at
> org.apache.flink.streaming.api.graph.StreamGraph.getJobGraph(StreamGraph.java:777)
> at
> org.apache.flink.streaming.api.graph.StreamGraphTranslator.translateToJobGraph(StreamGraphTranslator.java:52)
> at
> org.apache.flink.client.FlinkPipelineTranslationUtil.getJobGraph(FlinkPipelineTranslationUtil.java:43)
> at
> org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:57)
> at
> org.apache.flink.runtime.webmonitor.handlers.utils.JarHandlerUtils$JarHandlerContext.toJobGraph(JarHandlerUtils.java:128)
> at
> org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$getJobGraphAsync$6(JarRunHandler.java:138)
> at
> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at 

Re: Flink SQL job failed to submit with enableCheckpointing while SQL contains UDF

2020-03-01 Thread Benchao Li
Hi fulin,

It seems like a bug in the code generation.

Could you provide us more information?
1. what planner are you using? blink or legacy planner?
2. how do you register your UDF?
3. does this has a relation with checkpointing? what if you enable
checkpointing and not use your udf? and disable checkpointing and use udf?

sunfulin  于2020年3月1日周日 下午5:41写道:

> Hi, guys
> I am running Flink 1.10 SQL job with UpsertStreamSink to ElasticSearch. In
> my sql logic, I am using a UDF like ts2Date to handle date format stream
> fields. However, when I add the `env.enableCheckpointing(time)`, my job
> failed to submit and throws exception like following. This is really weird,
> cause when I remove the UDF, the job can submit successfully. Any
> suggestion is highly appreciated. Besides, my sql logic is like :
>
> *INSERT INTO *realtime_product_sell
> *select *U.sor_pty_id,
>U.entrust_date,
>U.entrust_time,
>U.product_code,
>U.business_type,
>sum(*cast*(U.balance *as double*)) *as *balance,
>COALESCE(C.cust_name, *'--'*) *as *cust_name,
>COALESCE(C.open_comp_name, *'--'*) *AS *open_comp_name,
>COALESCE(C.open_comp_id, *'--'*) *as *open_comp_id,
>COALESCE(C.org_name,*'--'*) *as *org_name,
>COALESCE(C.org_id,*'--'*) *as *comp_name,
>COALESCE(C.comp_name, *'--'*) *AS *comp_name,
>COALESCE(C.comp_id,*'--'*) *as *comp_id,
>COALESCE(C.mng_name,*'--'*) *as *mng_name,
>COALESCE(C.mng_id,*'--'*) *as *mng_id,
>COALESCE(C.is_tg,*'--'*) *as *is_tg,
>COALESCE(C.cust_type,*'--'*) *as *cust_type,
>COALESCE(C.avg_tot_aset_y365, 0.00) *as *avg_tot_aset_y365,
>COALESCE(C.avg_aset_create_y, 0.00) *as *avg_aset_create_y
>
> *from*(*select *customerNumber *as *sor_pty_id,
> ts2Date(`lastUpdateTime`, *true*) *as *entrust_date, -- the
> UDF
>ts2Date(`lastUpdateTime`, *false*) *as *entrust_time, -- the
> UDF
> fundCode *as *product_code,
> businessType *as *business_type,
> balance,
> proctime
>   *from **lscsp_sc_order_all **where *fundCode *in *(*'007118'*,
> *'007117'*) *and *businessType *in *(*'5'*) ) *as *U
>
> *left join**dim_app_cust_info **FOR *SYSTEM_TIME *AS OF *U.proctime *AS *C
> *on **U*.sor_pty_id = *C*.cust_id
> *group by *sor_pty_id,
> entrust_date,
> entrust_time,
> product_code,
> business_type,
> COALESCE(C.cust_name, *'--'*),
> COALESCE(C.open_comp_name, *'--'*),
> COALESCE(C.open_comp_id, *'--'*),
> COALESCE(C.org_name,*'--'*),
> COALESCE(C.org_id,*'--'*),
> COALESCE(C.comp_name, *'--'*),
> COALESCE(C.comp_id,*'--'*),
> COALESCE(C.mng_name,*'--'*),
> COALESCE(C.mng_id,*'--'*),
> COALESCE(C.is_tg,*'--'*),
> COALESCE(C.cust_type,*'--'*),
> COALESCE(C.avg_tot_aset_y365, 0.00),
> COALESCE(C.avg_aset_create_y, 0.00)
>
> 2020-03-01 17:22:06,504 ERROR
> org.apache.flink.runtime.webmonitor.handlers.JarRunHandler- Unhandled
> exception.
> org.apache.flink.util.FlinkRuntimeException:
> org.apache.flink.api.common.InvalidProgramException: Table program cannot
> be compiled. This is a bug. Please file an issue.
> at
> org.apache.flink.table.runtime.generated.CompileUtils.compile(CompileUtils.java:68)
> at
> org.apache.flink.table.runtime.generated.GeneratedClass.compile(GeneratedClass.java:78)
> at
> org.apache.flink.table.runtime.generated.GeneratedClass.getClass(GeneratedClass.java:96)
> at
> org.apache.flink.table.runtime.operators.CodeGenOperatorFactory.getStreamOperatorClass(CodeGenOperatorFactory.java:62)
> at
> org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.preValidate(StreamingJobGraphGenerator.java:214)
> at
> org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createJobGraph(StreamingJobGraphGenerator.java:149)
> at
> org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createJobGraph(StreamingJobGraphGenerator.java:104)
> at
> org.apache.flink.streaming.api.graph.StreamGraph.getJobGraph(StreamGraph.java:777)
> at
> org.apache.flink.streaming.api.graph.StreamGraphTranslator.translateToJobGraph(StreamGraphTranslator.java:52)
> at
> org.apache.flink.client.FlinkPipelineTranslationUtil.getJobGraph(FlinkPipelineTranslationUtil.java:43)
> at
> org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:57)
> at
> org.apache.flink.runtime.webmonitor.handlers.utils.JarHandlerUtils$JarHandlerContext.toJobGraph(JarHandlerUtils.java:128)
> at
> org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$getJobGraphAsync$6(JarRunHandler.java:138)
> at
> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at 

Get Tumbling Window Top-K using SQL

2020-03-01 Thread Lu Weizheng
Hi,

I find a question on 
StackOverflow(https://stackoverflow.com/questions/49191326/flink-stream-sql-order-by)
 about how to get Top-K using Flink SQL, it was written by Fabian. It was 
backed in 2018.
The main idea is using a RANK to get the Top K of filed 'a':

SELECT a, b, c
FROM (
  SELECT
a, b, c,
RANK() OVER (ORDER BY a PARTITION BY CEIL(t TO MINUTE) BETWEEN UNBOUNDED 
PRECEDING AND CURRENT ROW) as rank
  FROM yourTable)
WHERE rank <= 10

is there better way to get tumbling window Top-K item now?

And the wiki on dynamic table may need to update. 
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/streaming/dynamic_tables.html

In the above wiki, I don't know why the query has a field 'lastLogin'


SELECT user, RANK() OVER (ORDER BY lastLogin)
FROM (
  SELECT user, MAX(cTime) AS lastAction FROM clicks GROUP BY user
);

Thanks!


Flink SQL job failed to submit with enableCheckpointing while SQL contains UDF

2020-03-01 Thread sunfulin
Hi, guys
I am running Flink 1.10 SQL job with UpsertStreamSink to ElasticSearch. In my 
sql logic, I am using a UDF like ts2Date to handle date format stream fields. 
However, when I add the `env.enableCheckpointing(time)`, my job failed to 
submit and throws exception like following. This is really weird, cause when I 
remove the UDF, the job can submit successfully. Any suggestion is highly 
appreciated. Besides, my sql logic is like : 



INSERT INTO realtime_product_sell
select U.sor_pty_id,
   U.entrust_date,
   U.entrust_time,
   U.product_code,
   U.business_type,
   sum(cast(U.balance as double)) as balance,
   COALESCE(C.cust_name, '--') as cust_name,
   COALESCE(C.open_comp_name, '--') AS open_comp_name,
   COALESCE(C.open_comp_id, '--') as open_comp_id,
   COALESCE(C.org_name,'--') as org_name,
   COALESCE(C.org_id,'--') as comp_name,
   COALESCE(C.comp_name, '--') AS comp_name,
   COALESCE(C.comp_id,'--') as comp_id,
   COALESCE(C.mng_name,'--') as mng_name,
   COALESCE(C.mng_id,'--') as mng_id,
   COALESCE(C.is_tg,'--') as is_tg,
   COALESCE(C.cust_type,'--') as cust_type,
   COALESCE(C.avg_tot_aset_y365, 0.00) as avg_tot_aset_y365,
   COALESCE(C.avg_aset_create_y, 0.00) as avg_aset_create_y
from
(select customerNumber as sor_pty_id,
ts2Date(`lastUpdateTime`, true) as entrust_date, -- the UDF 
   ts2Date(`lastUpdateTime`, false) as entrust_time, -- the UDF
fundCode as product_code,
businessType as business_type,
balance,
proctime
  from lscsp_sc_order_all where fundCode in ('007118','007117') and 
businessType in ('5') ) as U
left join
dim_app_cust_info FOR SYSTEM_TIME AS OF U.proctime AS C
on U.sor_pty_id = C.cust_id
group by sor_pty_id,
entrust_date,
entrust_time,
product_code,
business_type,
COALESCE(C.cust_name, '--'),
COALESCE(C.open_comp_name, '--'),
COALESCE(C.open_comp_id, '--'),
COALESCE(C.org_name,'--'),
COALESCE(C.org_id,'--'),
COALESCE(C.comp_name, '--'),
COALESCE(C.comp_id,'--'),
COALESCE(C.mng_name,'--'),
COALESCE(C.mng_id,'--'),
COALESCE(C.is_tg,'--'),
COALESCE(C.cust_type,'--'),
COALESCE(C.avg_tot_aset_y365, 0.00),
COALESCE(C.avg_aset_create_y, 0.00)




2020-03-01 17:22:06,504 ERROR 
org.apache.flink.runtime.webmonitor.handlers.JarRunHandler- Unhandled 
exception.
org.apache.flink.util.FlinkRuntimeException: 
org.apache.flink.api.common.InvalidProgramException: Table program cannot be 
compiled. This is a bug. Please file an issue.
at 
org.apache.flink.table.runtime.generated.CompileUtils.compile(CompileUtils.java:68)
at 
org.apache.flink.table.runtime.generated.GeneratedClass.compile(GeneratedClass.java:78)
at 
org.apache.flink.table.runtime.generated.GeneratedClass.getClass(GeneratedClass.java:96)
at 
org.apache.flink.table.runtime.operators.CodeGenOperatorFactory.getStreamOperatorClass(CodeGenOperatorFactory.java:62)
at 
org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.preValidate(StreamingJobGraphGenerator.java:214)
at 
org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createJobGraph(StreamingJobGraphGenerator.java:149)
at 
org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createJobGraph(StreamingJobGraphGenerator.java:104)
at 
org.apache.flink.streaming.api.graph.StreamGraph.getJobGraph(StreamGraph.java:777)
at 
org.apache.flink.streaming.api.graph.StreamGraphTranslator.translateToJobGraph(StreamGraphTranslator.java:52)
at 
org.apache.flink.client.FlinkPipelineTranslationUtil.getJobGraph(FlinkPipelineTranslationUtil.java:43)
at 
org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:57)
at 
org.apache.flink.runtime.webmonitor.handlers.utils.JarHandlerUtils$JarHandlerContext.toJobGraph(JarHandlerUtils.java:128)
at 
org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$getJobGraphAsync$6(JarRunHandler.java:138)
at 
java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: 
org.apache.flink.shaded.guava18.com.google.common.util.concurrent.UncheckedExecutionException:
 org.apache.flink.api.common.InvalidProgramException: Table program cannot be 
compiled. This is a bug. Please file an issue.
at 
org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2203)
at 
org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache.get(LocalCache.java:3937)
at 
org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$LocalManualCache.get(LocalCache.java:4739)
at 

Flink SQL job failed to submit with enableCheckpointing while SQL contains UDF

2020-03-01 Thread sunfulin
Hi, guys
I am running Flink 1.10 SQL job with UpsertStreamSink to ElasticSearch. In my 
sql logic, I am using a UDF like ts2Date to handle date format stream fields. 
However, when I add the `env.enableCheckpointing(time)`, my job failed to 
submit and throws exception like following. This is really weird, cause when I 
remove the UDF, the job can submit successfully. Any suggestion is highly 
appreciated. Besides, my sql logic is like : 



INSERT INTO realtime_product_sell
select U.sor_pty_id,
   U.entrust_date,
   U.entrust_time,
   U.product_code,
   U.business_type,
   sum(cast(U.balance as double)) as balance,
   COALESCE(C.cust_name, '--') as cust_name,
   COALESCE(C.open_comp_name, '--') AS open_comp_name,
   COALESCE(C.open_comp_id, '--') as open_comp_id,
   COALESCE(C.org_name,'--') as org_name,
   COALESCE(C.org_id,'--') as comp_name,
   COALESCE(C.comp_name, '--') AS comp_name,
   COALESCE(C.comp_id,'--') as comp_id,
   COALESCE(C.mng_name,'--') as mng_name,
   COALESCE(C.mng_id,'--') as mng_id,
   COALESCE(C.is_tg,'--') as is_tg,
   COALESCE(C.cust_type,'--') as cust_type,
   COALESCE(C.avg_tot_aset_y365, 0.00) as avg_tot_aset_y365,
   COALESCE(C.avg_aset_create_y, 0.00) as avg_aset_create_y
from
(select customerNumber as sor_pty_id,
ts2Date(`lastUpdateTime`, true) as entrust_date, -- the UDF 
   ts2Date(`lastUpdateTime`, false) as entrust_time, -- the UDF
fundCode as product_code,
businessType as business_type,
balance,
proctime
  from lscsp_sc_order_all where fundCode in ('007118','007117') and 
businessType in ('5') ) as U
left join
dim_app_cust_info FOR SYSTEM_TIME AS OF U.proctime AS C
on U.sor_pty_id = C.cust_id
group by sor_pty_id,
entrust_date,
entrust_time,
product_code,
business_type,
COALESCE(C.cust_name, '--'),
COALESCE(C.open_comp_name, '--'),
COALESCE(C.open_comp_id, '--'),
COALESCE(C.org_name,'--'),
COALESCE(C.org_id,'--'),
COALESCE(C.comp_name, '--'),
COALESCE(C.comp_id,'--'),
COALESCE(C.mng_name,'--'),
COALESCE(C.mng_id,'--'),
COALESCE(C.is_tg,'--'),
COALESCE(C.cust_type,'--'),
COALESCE(C.avg_tot_aset_y365, 0.00),
COALESCE(C.avg_aset_create_y, 0.00)




2020-03-01 17:22:06,504 ERROR 
org.apache.flink.runtime.webmonitor.handlers.JarRunHandler- Unhandled 
exception.
org.apache.flink.util.FlinkRuntimeException: 
org.apache.flink.api.common.InvalidProgramException: Table program cannot be 
compiled. This is a bug. Please file an issue.
at 
org.apache.flink.table.runtime.generated.CompileUtils.compile(CompileUtils.java:68)
at 
org.apache.flink.table.runtime.generated.GeneratedClass.compile(GeneratedClass.java:78)
at 
org.apache.flink.table.runtime.generated.GeneratedClass.getClass(GeneratedClass.java:96)
at 
org.apache.flink.table.runtime.operators.CodeGenOperatorFactory.getStreamOperatorClass(CodeGenOperatorFactory.java:62)
at 
org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.preValidate(StreamingJobGraphGenerator.java:214)
at 
org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createJobGraph(StreamingJobGraphGenerator.java:149)
at 
org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createJobGraph(StreamingJobGraphGenerator.java:104)
at 
org.apache.flink.streaming.api.graph.StreamGraph.getJobGraph(StreamGraph.java:777)
at 
org.apache.flink.streaming.api.graph.StreamGraphTranslator.translateToJobGraph(StreamGraphTranslator.java:52)
at 
org.apache.flink.client.FlinkPipelineTranslationUtil.getJobGraph(FlinkPipelineTranslationUtil.java:43)
at 
org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:57)
at 
org.apache.flink.runtime.webmonitor.handlers.utils.JarHandlerUtils$JarHandlerContext.toJobGraph(JarHandlerUtils.java:128)
at 
org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$getJobGraphAsync$6(JarRunHandler.java:138)
at 
java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: 
org.apache.flink.shaded.guava18.com.google.common.util.concurrent.UncheckedExecutionException:
 org.apache.flink.api.common.InvalidProgramException: Table program cannot be 
compiled. This is a bug. Please file an issue.
at 
org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2203)
at 
org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache.get(LocalCache.java:3937)
at 
org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$LocalManualCache.get(LocalCache.java:4739)
at 

Re: Do I need to use Table API or DataStream API to construct sources?

2020-03-01 Thread Benchao Li
I don't know how gradle works, but in Maven, packaging dependencies into
one fat jar needs to specify how SPI property files should be dealt with,
like





Could you check that your final jar contains correct resource file?

godfrey he  于2020年3月1日周日 下午5:25写道:

> I think you should use `flink-sql-connector-kafka*-0.11*_2.11` instead of
> `flink-connector-kafka_2.11`.
>
> Bests,
> Godfrey
>
> kant kodali  于2020年3月1日周日 下午5:15写道:
>
>> The dependency was already there. Below is my build.gradle. Also I
>> checked the kafka version and looks like the jar
>>
>> flinkShadowJar "org.apache.flink:flink-connector-kafka_2.11:${flinkVersion}"
>>
>> downloads kafka-clients version 2.2.0. So I changed my code to version
>> 2.2.0 and same problem persists.
>>
>> buildscript {
>> repositories {
>> jcenter() // this applies only to the Gradle 'Shadow' plugin
>> }
>> dependencies {
>> classpath 'com.github.jengelman.gradle.plugins:shadow:2.0.4'
>> }
>> }
>>
>> plugins {
>> id 'java'
>> id 'application'
>> }
>>
>> mainClassName = 'Test'
>> apply plugin: 'com.github.johnrengelman.shadow'
>>
>> ext {
>> javaVersion = '1.8'
>> flinkVersion = '1.10.0'
>> scalaBinaryVersion = '2.11'
>> slf4jVersion = '1.7.7'
>> log4jVersion = '1.2.17'
>> }
>>
>>
>> sourceCompatibility = javaVersion
>> targetCompatibility = javaVersion
>> tasks.withType(JavaCompile) {
>> options.encoding = 'UTF-8'
>> }
>>
>> applicationDefaultJvmArgs = ["-Dlog4j.configuration=log4j.properties"]
>>
>> // declare where to find the dependencies of your project
>> repositories {
>> mavenCentral()
>> maven { url 
>> "https://repository.apache.org/content/repositories/snapshots/; }
>> }
>>
>> // NOTE: We cannot use "compileOnly" or "shadow" configurations since then 
>> we could not run code
>> // in the IDE or with "gradle run". We also cannot exclude transitive 
>> dependencies from the
>> // shadowJar yet (see https://github.com/johnrengelman/shadow/issues/159).
>> // -> Explicitly define the // libraries we want to be included in the 
>> "flinkShadowJar" configuration!
>> configurations {
>> flinkShadowJar // dependencies which go into the shadowJar
>>
>> // always exclude these (also from transitive dependencies) since they 
>> are provided by Flink
>> flinkShadowJar.exclude group: 'org.apache.flink', module: 'force-shading'
>> flinkShadowJar.exclude group: 'com.google.code.findbugs', module: 
>> 'jsr305'
>> flinkShadowJar.exclude group: 'org.slf4j'
>> flinkShadowJar.exclude group: 'log4j'
>> }
>>
>> // declare the dependencies for your production and test code
>> dependencies {
>> // --
>> // Compile-time dependencies that should NOT be part of the
>> // shadow jar and are provided in the lib folder of Flink
>> // --
>> compile "org.apache.flink:flink-java:${flinkVersion}"
>> compile 
>> "org.apache.flink:flink-streaming-java_${scalaBinaryVersion}:${flinkVersion}"
>>
>> // --
>> // Dependencies that should be part of the shadow jar, e.g.
>> // connectors. These must be in the flinkShadowJar configuration!
>> // --
>>
>> compile "org.apache.flink:flink-java:${flinkVersion}"
>> compile 
>> "org.apache.flink:flink-streaming-java_${scalaBinaryVersion}:${flinkVersion}"
>> flinkShadowJar "org.apache.flink:flink-csv:${flinkVersion}"
>>
>> flinkShadowJar 
>> "org.apache.flink:flink-connector-kafka_2.11:${flinkVersion}"
>> flinkShadowJar "org.apache.flink:flink-table-api-java:${flinkVersion}"
>> compileOnly 
>> "org.apache.flink:flink-table-api-java-bridge_2.11:${flinkVersion}"
>> compileOnly 
>> "org.apache.flink:flink-table-planner-blink_2.11:${flinkVersion}"
>> flinkShadowJar 
>> "org.apache.flink:flink-streaming-scala_2.11:${flinkVersion}"
>> flinkShadowJar 
>> "org.apache.flink:flink-statebackend-rocksdb_2.11:${flinkVersion}"
>>
>>
>> compile "log4j:log4j:${log4jVersion}"
>> compile "org.slf4j:slf4j-log4j12:${slf4jVersion}"
>>
>> // Add test dependencies here.
>> // testCompile "junit:junit:4.12"
>> testImplementation 
>> "org.apache.flink:flink-table-api-java-bridge_2.11:${flinkVersion}"
>> testImplementation 
>> "org.apache.flink:flink-table-planner-blink_2.11:${flinkVersion}"
>> }
>>
>> // make compileOnly dependencies available for tests:
>> sourceSets {
>> main.compileClasspath += configurations.flinkShadowJar
>> main.runtimeClasspath += configurations.flinkShadowJar
>>
>> test.compileClasspath += configurations.flinkShadowJar
>> test.runtimeClasspath += configurations.flinkShadowJar
>>
>> javadoc.classpath += configurations.flinkShadowJar
>> }
>>
>> run.classpath = sourceSets.main.runtimeClasspath
>>
>> jar {

Re: Do I need to use Table API or DataStream API to construct sources?

2020-03-01 Thread godfrey he
I think you should use `flink-sql-connector-kafka*-0.11*_2.11` instead of
`flink-connector-kafka_2.11`.

Bests,
Godfrey

kant kodali  于2020年3月1日周日 下午5:15写道:

> The dependency was already there. Below is my build.gradle. Also I checked
> the kafka version and looks like the jar
>
> flinkShadowJar "org.apache.flink:flink-connector-kafka_2.11:${flinkVersion}"
>
> downloads kafka-clients version 2.2.0. So I changed my code to version
> 2.2.0 and same problem persists.
>
> buildscript {
> repositories {
> jcenter() // this applies only to the Gradle 'Shadow' plugin
> }
> dependencies {
> classpath 'com.github.jengelman.gradle.plugins:shadow:2.0.4'
> }
> }
>
> plugins {
> id 'java'
> id 'application'
> }
>
> mainClassName = 'Test'
> apply plugin: 'com.github.johnrengelman.shadow'
>
> ext {
> javaVersion = '1.8'
> flinkVersion = '1.10.0'
> scalaBinaryVersion = '2.11'
> slf4jVersion = '1.7.7'
> log4jVersion = '1.2.17'
> }
>
>
> sourceCompatibility = javaVersion
> targetCompatibility = javaVersion
> tasks.withType(JavaCompile) {
> options.encoding = 'UTF-8'
> }
>
> applicationDefaultJvmArgs = ["-Dlog4j.configuration=log4j.properties"]
>
> // declare where to find the dependencies of your project
> repositories {
> mavenCentral()
> maven { url 
> "https://repository.apache.org/content/repositories/snapshots/; }
> }
>
> // NOTE: We cannot use "compileOnly" or "shadow" configurations since then we 
> could not run code
> // in the IDE or with "gradle run". We also cannot exclude transitive 
> dependencies from the
> // shadowJar yet (see https://github.com/johnrengelman/shadow/issues/159).
> // -> Explicitly define the // libraries we want to be included in the 
> "flinkShadowJar" configuration!
> configurations {
> flinkShadowJar // dependencies which go into the shadowJar
>
> // always exclude these (also from transitive dependencies) since they 
> are provided by Flink
> flinkShadowJar.exclude group: 'org.apache.flink', module: 'force-shading'
> flinkShadowJar.exclude group: 'com.google.code.findbugs', module: 'jsr305'
> flinkShadowJar.exclude group: 'org.slf4j'
> flinkShadowJar.exclude group: 'log4j'
> }
>
> // declare the dependencies for your production and test code
> dependencies {
> // --
> // Compile-time dependencies that should NOT be part of the
> // shadow jar and are provided in the lib folder of Flink
> // --
> compile "org.apache.flink:flink-java:${flinkVersion}"
> compile 
> "org.apache.flink:flink-streaming-java_${scalaBinaryVersion}:${flinkVersion}"
>
> // --
> // Dependencies that should be part of the shadow jar, e.g.
> // connectors. These must be in the flinkShadowJar configuration!
> // --
>
> compile "org.apache.flink:flink-java:${flinkVersion}"
> compile 
> "org.apache.flink:flink-streaming-java_${scalaBinaryVersion}:${flinkVersion}"
> flinkShadowJar "org.apache.flink:flink-csv:${flinkVersion}"
>
> flinkShadowJar 
> "org.apache.flink:flink-connector-kafka_2.11:${flinkVersion}"
> flinkShadowJar "org.apache.flink:flink-table-api-java:${flinkVersion}"
> compileOnly 
> "org.apache.flink:flink-table-api-java-bridge_2.11:${flinkVersion}"
> compileOnly 
> "org.apache.flink:flink-table-planner-blink_2.11:${flinkVersion}"
> flinkShadowJar 
> "org.apache.flink:flink-streaming-scala_2.11:${flinkVersion}"
> flinkShadowJar 
> "org.apache.flink:flink-statebackend-rocksdb_2.11:${flinkVersion}"
>
>
> compile "log4j:log4j:${log4jVersion}"
> compile "org.slf4j:slf4j-log4j12:${slf4jVersion}"
>
> // Add test dependencies here.
> // testCompile "junit:junit:4.12"
> testImplementation 
> "org.apache.flink:flink-table-api-java-bridge_2.11:${flinkVersion}"
> testImplementation 
> "org.apache.flink:flink-table-planner-blink_2.11:${flinkVersion}"
> }
>
> // make compileOnly dependencies available for tests:
> sourceSets {
> main.compileClasspath += configurations.flinkShadowJar
> main.runtimeClasspath += configurations.flinkShadowJar
>
> test.compileClasspath += configurations.flinkShadowJar
> test.runtimeClasspath += configurations.flinkShadowJar
>
> javadoc.classpath += configurations.flinkShadowJar
> }
>
> run.classpath = sourceSets.main.runtimeClasspath
>
> jar {
> manifest {
> attributes 'Built-By': System.getProperty('user.name'),
> 'Build-Jdk': System.getProperty('java.version')
> }
> }
>
> shadowJar {
> configurations = [project.configurations.flinkShadowJar]
> }
>
>
>
>
> On Sun, Mar 1, 2020 at 12:50 AM godfrey he  wrote:
>
>> hi kant,
>>
>> > Also why do I need to convert to DataStream to print the rows of a
>> table? Why 

Re: Do I need to use Table API or DataStream API to construct sources?

2020-03-01 Thread kant kodali
The dependency was already there. Below is my build.gradle. Also I checked
the kafka version and looks like the jar

flinkShadowJar "org.apache.flink:flink-connector-kafka_2.11:${flinkVersion}"

downloads kafka-clients version 2.2.0. So I changed my code to version
2.2.0 and same problem persists.

buildscript {
repositories {
jcenter() // this applies only to the Gradle 'Shadow' plugin
}
dependencies {
classpath 'com.github.jengelman.gradle.plugins:shadow:2.0.4'
}
}

plugins {
id 'java'
id 'application'
}

mainClassName = 'Test'
apply plugin: 'com.github.johnrengelman.shadow'

ext {
javaVersion = '1.8'
flinkVersion = '1.10.0'
scalaBinaryVersion = '2.11'
slf4jVersion = '1.7.7'
log4jVersion = '1.2.17'
}


sourceCompatibility = javaVersion
targetCompatibility = javaVersion
tasks.withType(JavaCompile) {
options.encoding = 'UTF-8'
}

applicationDefaultJvmArgs = ["-Dlog4j.configuration=log4j.properties"]

// declare where to find the dependencies of your project
repositories {
mavenCentral()
maven { url
"https://repository.apache.org/content/repositories/snapshots/; }
}

// NOTE: We cannot use "compileOnly" or "shadow" configurations since
then we could not run code
// in the IDE or with "gradle run". We also cannot exclude transitive
dependencies from the
// shadowJar yet (see https://github.com/johnrengelman/shadow/issues/159).
// -> Explicitly define the // libraries we want to be included in the
"flinkShadowJar" configuration!
configurations {
flinkShadowJar // dependencies which go into the shadowJar

// always exclude these (also from transitive dependencies) since
they are provided by Flink
flinkShadowJar.exclude group: 'org.apache.flink', module: 'force-shading'
flinkShadowJar.exclude group: 'com.google.code.findbugs', module: 'jsr305'
flinkShadowJar.exclude group: 'org.slf4j'
flinkShadowJar.exclude group: 'log4j'
}

// declare the dependencies for your production and test code
dependencies {
// --
// Compile-time dependencies that should NOT be part of the
// shadow jar and are provided in the lib folder of Flink
// --
compile "org.apache.flink:flink-java:${flinkVersion}"
compile 
"org.apache.flink:flink-streaming-java_${scalaBinaryVersion}:${flinkVersion}"

// --
// Dependencies that should be part of the shadow jar, e.g.
// connectors. These must be in the flinkShadowJar configuration!
// --

compile "org.apache.flink:flink-java:${flinkVersion}"
compile 
"org.apache.flink:flink-streaming-java_${scalaBinaryVersion}:${flinkVersion}"
flinkShadowJar "org.apache.flink:flink-csv:${flinkVersion}"

flinkShadowJar "org.apache.flink:flink-connector-kafka_2.11:${flinkVersion}"
flinkShadowJar "org.apache.flink:flink-table-api-java:${flinkVersion}"
compileOnly
"org.apache.flink:flink-table-api-java-bridge_2.11:${flinkVersion}"
compileOnly
"org.apache.flink:flink-table-planner-blink_2.11:${flinkVersion}"
flinkShadowJar "org.apache.flink:flink-streaming-scala_2.11:${flinkVersion}"
flinkShadowJar
"org.apache.flink:flink-statebackend-rocksdb_2.11:${flinkVersion}"


compile "log4j:log4j:${log4jVersion}"
compile "org.slf4j:slf4j-log4j12:${slf4jVersion}"

// Add test dependencies here.
// testCompile "junit:junit:4.12"
testImplementation
"org.apache.flink:flink-table-api-java-bridge_2.11:${flinkVersion}"
testImplementation
"org.apache.flink:flink-table-planner-blink_2.11:${flinkVersion}"
}

// make compileOnly dependencies available for tests:
sourceSets {
main.compileClasspath += configurations.flinkShadowJar
main.runtimeClasspath += configurations.flinkShadowJar

test.compileClasspath += configurations.flinkShadowJar
test.runtimeClasspath += configurations.flinkShadowJar

javadoc.classpath += configurations.flinkShadowJar
}

run.classpath = sourceSets.main.runtimeClasspath

jar {
manifest {
attributes 'Built-By': System.getProperty('user.name'),
'Build-Jdk': System.getProperty('java.version')
}
}

shadowJar {
configurations = [project.configurations.flinkShadowJar]
}




On Sun, Mar 1, 2020 at 12:50 AM godfrey he  wrote:

> hi kant,
>
> > Also why do I need to convert to DataStream to print the rows of a
> table? Why not have a print method in the Table itself?
> Flink 1.10 introduces a utility class named TableUtils to convert a Table
> to List, this utility class is mainly used for demonstration or
> testing and is only applicable for *small batch jobs* and small finite *append
> only stream jobs*.  code like:
> Table table = tEnv.sqlQuery("select ...");
> List result = TableUtils.collectToList(table);
> result.
>
> currently, 

Re: Do I need to use Table API or DataStream API to construct sources?

2020-03-01 Thread godfrey he
hi kant,

> Also why do I need to convert to DataStream to print the rows of a table?
Why not have a print method in the Table itself?
Flink 1.10 introduces a utility class named TableUtils to convert a Table
to List, this utility class is mainly used for demonstration or
testing and is only applicable for *small batch jobs* and small finite *append
only stream jobs*.  code like:
Table table = tEnv.sqlQuery("select ...");
List result = TableUtils.collectToList(table);
result.

currently, we are planner to implement Table#collect[1], after
that Table#head and Table#print may be also introduced soon.

>  The program finished with the following exception:
please make sure that the kafka version in Test class and the kafka version
in pom dependency are same. I tested your code successfully.

Bests,
Godfrey

[1] https://issues.apache.org/jira/browse/FLINK-14807


Benchao Li  于2020年3月1日周日 下午4:44写道:

> Hi kant,
>
> CSV format is an independent module, you need to add it as your
> dependency.
>
> 
>org.apache.flink
>flink-csv
>${flink.version}
> 
>
>
> kant kodali  于2020年3月1日周日 下午3:43写道:
>
>> 
>>  The program finished with the following exception:
>>
>> org.apache.flink.client.program.ProgramInvocationException: The main
>> method caused an error: findAndCreateTableSource failed.
>> at
>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:335)
>> at
>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205)
>> at
>> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:138)
>> at
>> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:664)
>> at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)
>> at
>> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:895)
>> at
>> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968)
>> at
>> org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
>> at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968)
>> Caused by: org.apache.flink.table.api.TableException:
>> findAndCreateTableSource failed.
>> at
>> org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:55)
>> at
>> org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:92)
>> at
>> org.apache.flink.table.planner.plan.schema.CatalogSourceTable.findAndCreateTableSource(CatalogSourceTable.scala:156)
>> at
>> org.apache.flink.table.planner.plan.schema.CatalogSourceTable.tableSource$lzycompute(CatalogSourceTable.scala:65)
>> at
>> org.apache.flink.table.planner.plan.schema.CatalogSourceTable.tableSource(CatalogSourceTable.scala:65)
>> at
>> org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.scala:76)
>> at
>> org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:3328)
>> at
>> org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2357)
>> at
>> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2051)
>> at
>> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2005)
>> at
>> org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:646)
>> at
>> org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:627)
>> at
>> org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3181)
>> at
>> org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:563)
>> at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org
>> $apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:148)
>> at
>> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:135)
>> at
>> org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:522)
>> at
>> org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:436)
>> at
>> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:154)
>> at
>> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:66)
>> at
>> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:464)
>> at Test.main(Test.java:34)
>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> at
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>> at
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> at java.lang.reflect.Method.invoke(Method.java:498)
>> at
>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:321)
>> ... 8 more
>> Caused 

Re: Do I need to use Table API or DataStream API to construct sources?

2020-03-01 Thread Benchao Li
Hi kant,

CSV format is an independent module, you need to add it as your dependency.


   org.apache.flink
   flink-csv
   ${flink.version}



kant kodali  于2020年3月1日周日 下午3:43写道:

> 
>  The program finished with the following exception:
>
> org.apache.flink.client.program.ProgramInvocationException: The main
> method caused an error: findAndCreateTableSource failed.
> at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:335)
> at
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205)
> at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:138)
> at
> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:664)
> at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)
> at
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:895)
> at
> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968)
> at
> org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
> at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968)
> Caused by: org.apache.flink.table.api.TableException:
> findAndCreateTableSource failed.
> at
> org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:55)
> at
> org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:92)
> at
> org.apache.flink.table.planner.plan.schema.CatalogSourceTable.findAndCreateTableSource(CatalogSourceTable.scala:156)
> at
> org.apache.flink.table.planner.plan.schema.CatalogSourceTable.tableSource$lzycompute(CatalogSourceTable.scala:65)
> at
> org.apache.flink.table.planner.plan.schema.CatalogSourceTable.tableSource(CatalogSourceTable.scala:65)
> at
> org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.scala:76)
> at
> org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:3328)
> at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2357)
> at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2051)
> at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2005)
> at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:646)
> at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:627)
> at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3181)
> at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:563)
> at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org
> $apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:148)
> at
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:135)
> at
> org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:522)
> at
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:436)
> at
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:154)
> at
> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:66)
> at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:464)
> at Test.main(Test.java:34)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:321)
> ... 8 more
> Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException:
> Could not find a suitable table factory for
> 'org.apache.flink.table.factories.TableSourceFactory' in
> the classpath.
>
> Reason: Required context properties mismatch.
>
> The matching candidates:
> org.apache.flink.table.sources.CsvAppendTableSourceFactory
> Mismatched properties:
> 'connector.type' expects 'filesystem', but is 'kafka'
>
> The following properties are requested:
> connector.property-version=1
> connector.topic=test-topic1
> connector.type=kafka
> connector.version=0.11
> format.property-version=1
> format.type=csv
> schema.0.data-type=VARCHAR(2147483647)
> schema.0.name=f0
> update-mode=append
>
> The following factories have been considered:
> org.apache.flink.table.sources.CsvBatchTableSourceFactory
> org.apache.flink.table.sources.CsvAppendTableSourceFactory
> at
> org.apache.flink.table.factories.TableFactoryService.filterByContext(TableFactoryService.java:322)
> at
>