Re: Queryable state formal release plan

2019-05-20 Thread Aljoscha Krettek
Hi,

Currently no committers (or PMC members) are focusing on the queryable state 
feature. This will probably mean that not much is going to happen there in the 
near future. However, there is some discussion on the development by the larger 
community about QS: 
https://lists.apache.org/thread.html/ad82be36f36ab674764b71c1319121b524c0052b3b14203c18557c11@%3Cdev.flink.apache.org%3E
 


Best,
Aljoscha

> On 20. May 2019, at 08:08, Praveen Chandna  
> wrote:
> 
> Hi 
>  
> As the Queryable state is in the Beta state, Can you Please confirm the plan 
> for the formal release of the feature Queryable state.
> Is there any timeline by when this would be added to Flink.
>  
> Thanks !!!
> 
>  
> /// Regards
> Praveen Chandna
> Product Owner,
> Mobile +91 9873597204  |  ECN: 2864



monitor finished files on a Continues Reader

2019-05-20 Thread Hanan Yehudai
Hi
im looking for a way to delete / rename files that are done loading..

im using the env.readFile ,   monitoring a directory for all new files,  once 
files are done with I would like to delete it.
Is there a way to monitor the closed splits in the continues reader ?  is there 
an different way to do this ?


Regards,
Hanan



Re: FW: Constant backpressure on flink job

2019-05-20 Thread Dawid Wysakowicz
Hi,

Have you checked the logs for exceptions? Could you share the logs with
us? Have you tried switching to e.g. FSStateBackend or disabling the
incremental checkpoints on 1.7.2 to see in what configuration the
problem occurs?

Best,

Dawid

On 20/05/2019 09:09, Georgi Stoyanov wrote:
>
> Hi guys,
>
>  
>
> Could you help us find why our checkpoint fails so often (we don’t
> have huge state at all)?
>
> We are experiencing that problem for around 2 months just after
> upgrading to 1.7.2 and we configured incremental checkpointing. Could
> this be the reason?
>
>  
>
> Regards,
>
> Georgi Stoyanov
>
>  
>
> *From:*Monika Hristova 
> *Sent:* Tuesday, May 7, 2019 3:55 PM
> *To:* Dawid Wysakowicz ; user@flink.apache.org
> *Subject:* RE: Constant backpressure on flink job
>
>  
>
> Hello Dawid,
>
>  
>
> Thank you for your response.
>
> What I actually noticed is that first checkpoint starts failing with
> exception “Checkpoint expired before completing”. It keeps failing and
> then (after an hour or so ) the backpressure occurs. This is my job
> graph(please see attached files JobGraph_1) with marked in red
> problematic operators. After this the backpressure occurs and
> operators marked in red are those with high backpressure (JobGraph_2).
>
>  
>
> Best Regards,
>
> Monika
>
>  
>
>  
>
> *From:*Dawid Wysakowicz  >
> *Sent:* Thursday, April 25, 2019 11:14 AM
> *To:* Monika Hristova  >; user@flink.apache.org
> 
> *Subject:* Re: Constant backpressure on flink job
>
>  
>
> Hi Monika,
>
> I would start with identifying the operator that causes backpressure.
> More information how to monitor backpressure you can find here in the
> docs[1]. You might also be interested in Seth's (cc'ed) webinar[2],
> where he also talks how to debug backpressure.
>
> Best,
>
> Dawid
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.8/monitoring/back_pressure.html#monitoring-back-pressure
>
> [2]
> https://www.ververica.com/resources/webinar/webinar/debugging-flink-tutorial
>
> On 22/04/2019 17:44, Monika Hristova wrote:
>
> Hello,
>
>  
>
> We are experiencing regular backpressure (at least once a week). I
> would like to ask how we can fix it.
>
>  
>
> Currently our configurations are:
>
> *vi /usr/lib/flink/conf/flink-conf.yaml*
>
> # Settings applied by Cloud Dataproc initialization action
>
> jobmanager.rpc.address: bonusengine-prod-m-0
>
> jobmanager.heap.mb: 4096
>
> jobmanager.rpc.port: 6123
>
> taskmanager.heap.mb: 4096
>
> taskmanager.memory.preallocate: false
>
> taskmanager.numberOfTaskSlots: 8
>
> #taskmanager.network.numberOfBuffers: 21952 # legacy deprecated
>
> taskmanager.network.memory.fraction: 0.9
>
> taskmanager.network.memory.min: 67108864
>
> taskmanager.network.memory.max: 1073741824
>
> taskmanager.memory.segment-size: 65536
>
> parallelism.default: 52
>
> web.port: 8081
>
> web.timeout: 12
>
> heartbeat.interval: 1
>
> heartbeat.timeout: 10
>
> yarn.application-attempts: 10
>
> high-availability: zookeeper
>
> high-availability.zookeeper.quorum:
> 
> bonusengine-prod-m-0:2181,bonusengine-prod-m-1:2181,bonusengine-prod-m-2:2181
>
> high-availability.zookeeper.path.root: /flink
>
> #high-availability.zookeeper.storageDir:
> hdfs:///flink/recovery # legacy deprecated
>
> high-availability.storageDir: hdfs:///flink/recovery
>
> flink.partition-discovery.interval-millis=6
>
> fs.hdfs.hadoopconf: /etc/hadoop/conf
>
> state.backend: rocksdb
>
> state.checkpoints.dir: hdfs:///bonusengine/checkpoints/
>
> state.savepoints.dir: hdfs:///bonusengine/savepoints
>
> metrics.reporters: stsd
>
> metrics.reporter.stsd.class:
> org.apache.flink.metrics.statsd.StatsDReporter
>
> metrics.reporter.stsd.host: 127.0.0.1
>
> metrics.reporter.stsd.port: 8125
>
> zookeeper.sasl.disable: true
>
> yarn.reallocate-failed: true
>
> yarn.maximum-failed-containers: 32
>
> web.backpressure.refresh-interval: 6
>
> web.backpressure.num-samples: 100
>
> web.backpressure.delay-between-samples: 50
>
>  
>
> with Hadoop HA cluster: masters -> 8 vCPUs, 7.2 GB and slaves ->
> 16 vCPUs, 60 GB with yarn configuration(*see attached file*)
>
>  
>
> We have one yarn session started where 8 jobs are run. Three of
> them are consuming the same source (kafka) which is causing the
> backpressure, but only one of them experiences backpressure. The
> state of the job is 20 something MB and the checkpoint is
> configured as follows:
>
> *checkpointing.interval=*30 # makes sure value in  ms of
> progress happens between checkpoints
> *checkpointing.pause_between_checkpointing=*24 # checkpoints
> have to complete within value in ms or are discarded
> *checkpointing.timeou

Re: FW: Constant backpressure on flink job

2019-05-20 Thread Till Rohrmann
Hi Monika and Georgi,

it is quite hard to debug this problem remotely because one would need
access to the logs with DEBUG log level. Additionally, it would be great to
have a better understanding of what the individual operators do.

In general, 20 MB of state should be super easy to handle for Flink
(independent of incremental and full checkpoints). If the problem only
occurred after upgrading to Flink 1.7.2 and enabling incremental
checkpoints I would try to disable incremental checkpoints or to enable
incremental checkpoints with the older Flink version. Which version did you
use before?

Usually, if a checkpoint expired before it can be completed this means that
either the checkpoint operations take too long or that the propagation of
the checkpoint barriers through the topology is slow. The former can be
caused by problems when writing to HDFS for example (retries, overloaded
HDFS, slow network connection). The latter can be caused if your operators
are slow processing the stream records (e.g. blocking call to external
system, too little resources, complex computation for each record).

Cheers,
Till

On Mon, May 20, 2019 at 9:10 AM Georgi Stoyanov  wrote:

> Hi guys,
>
>
>
> Could you help us find why our checkpoint fails so often (we don’t have
> huge state at all)?
>
> We are experiencing that problem for around 2 months just after upgrading
> to 1.7.2 and we configured incremental checkpointing. Could this be the
> reason?
>
>
>
> Regards,
>
> Georgi Stoyanov
>
>
>
> *From:* Monika Hristova 
> *Sent:* Tuesday, May 7, 2019 3:55 PM
> *To:* Dawid Wysakowicz ; user@flink.apache.org
> *Subject:* RE: Constant backpressure on flink job
>
>
>
> Hello Dawid,
>
>
>
> Thank you for your response.
>
> What I actually noticed is that first checkpoint starts failing with
> exception “Checkpoint expired before completing”. It keeps failing and then
> (after an hour or so ) the backpressure occurs. This is my job graph(please
> see attached files JobGraph_1) with marked in red problematic operators.
> After this the backpressure occurs and operators marked in red are those
> with high backpressure (JobGraph_2).
>
>
>
> Best Regards,
>
> Monika
>
>
>
>
>
> *From:* Dawid Wysakowicz 
> *Sent:* Thursday, April 25, 2019 11:14 AM
> *To:* Monika Hristova ; user@flink.apache.org
> *Subject:* Re: Constant backpressure on flink job
>
>
>
> Hi Monika,
>
> I would start with identifying the operator that causes backpressure. More
> information how to monitor backpressure you can find here in the docs[1].
> You might also be interested in Seth's (cc'ed) webinar[2], where he also
> talks how to debug backpressure.
>
> Best,
>
> Dawid
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.8/monitoring/back_pressure.html#monitoring-back-pressure
>
> [2]
> https://www.ververica.com/resources/webinar/webinar/debugging-flink-tutorial
>
> On 22/04/2019 17:44, Monika Hristova wrote:
>
> Hello,
>
>
>
> We are experiencing regular backpressure (at least once a week). I would
> like to ask how we can fix it.
>
>
>
> Currently our configurations are:
>
> *vi /usr/lib/flink/conf/flink-conf.yaml*
>
> # Settings applied by Cloud Dataproc initialization action
>
> jobmanager.rpc.address: bonusengine-prod-m-0
>
> jobmanager.heap.mb: 4096
>
> jobmanager.rpc.port: 6123
>
> taskmanager.heap.mb: 4096
>
> taskmanager.memory.preallocate: false
>
> taskmanager.numberOfTaskSlots: 8
>
> #taskmanager.network.numberOfBuffers: 21952 # legacy deprecated
>
> taskmanager.network.memory.fraction: 0.9
>
> taskmanager.network.memory.min: 67108864
>
> taskmanager.network.memory.max: 1073741824
>
> taskmanager.memory.segment-size: 65536
>
> parallelism.default: 52
>
> web.port: 8081
>
> web.timeout: 12
>
> heartbeat.interval: 1
>
> heartbeat.timeout: 10
>
> yarn.application-attempts: 10
>
> high-availability: zookeeper
>
> high-availability.zookeeper.quorum:
> bonusengine-prod-m-0:2181,bonusengine-prod-m-1:2181,bonusengine-prod-m-2:2181
>
> high-availability.zookeeper.path.root: /flink
>
> #high-availability.zookeeper.storageDir: hdfs:///flink/recovery #
> legacy deprecated
>
> high-availability.storageDir: hdfs:///flink/recovery
>
> flink.partition-discovery.interval-millis=6
>
> fs.hdfs.hadoopconf: /etc/hadoop/conf
>
> state.backend: rocksdb
>
> state.checkpoints.dir: hdfs:///bonusengine/checkpoints/
>
> state.savepoints.dir: hdfs:///bonusengine/savepoints
>
> metrics.reporters: stsd
>
> metrics.reporter.stsd.class: org.apache.flink.metrics.statsd.StatsDReporter
>
> metrics.reporter.stsd.host: 127.0.0.1
>
> metrics.reporter.stsd.port: 8125
>
> zookeeper.sasl.disable: true
>
> yarn.reallocate-failed: true
>
> yarn.maximum-failed-containers: 32
>
> web.backpressure.refresh-interval: 6
>
> web.backpressure.num-samples: 100
>
> web.backpressure.delay-between-samples: 50
>
>
>
> with Hadoop HA cluster: masters -> 8 vCPUs, 7.2 GB and slaves -> 16 vCPUs,
> 60 GB with yarn configuration(*see attached file*)
>
>
>
> We have one

Connecting to a service (Eg: Kafka, Cassandra) with different keytabs from the same cluster

2019-05-20 Thread Kumar Bolar, Harshith
Hi all,
We have a central Flink cluster which will be used by multiple different teams 
(Data Science, Engineering etc). Each team has their own user and keytab to 
connect to services like Kafka, Cassandra etc. How should the jobs be 
configured such that different jobs use different keytabs and principals to 
connect to Kafka?
Right now with a single user, we have the following entry in the jaas.conf file.
KafkaClient {
   com.sun.security.auth.module.Krb5LoginModule required
   useTicketCache=false
   renewTicket=true
   useKeyTab=true
   keyTab="/opt/certs/flink-user.keytab"
   serviceName="kafka"
   principal="flink-u...@test.abc.com";
};
How can I add more users like this and reference them in the jobs separately?
Thanks,
Harshith


questions about yarn container(taskmanager)memory allocation

2019-05-20 Thread XiangWei Huang
Hi all,
Currently i am running my flink application in yarn session mode and
using below commnad :
*bin/yarn-session.sh -d -s 3 -jm 1024 -tm 4096*
when taskmanager complete to started,i found the container launching
command is :
* bin/java -Xms2765m -Xmx2765m -XX:MaxDirectMemorySize=1331m ..*
from flink source code,i'v learned how direct memory size and heap memory
size calculated :
container total memory: 4096M
network memory fraction: 0.1
cut off memory fraction: 0.25
networkMemory = 4096*0.1 = 409.6
cut off memory = (4096 - 409.6) * 0.25 = 921.6
*directMemorySize* = 409.6 + 921.6 = 1331.2
*heapsize *= 4096 - 1331.2 = 2764.8

Below is the environment i'm using to run yarn and flink:
*  jdk version: 1.8*
* flink version: 1.6.1*
*OS: centos7*

there are two questions about memory allocation i want to ask:
1. Since the jdk version i an using is 1.8,is it necessary to consider
the *metaspace
memory *into calculation.According to the current way to calculate memory
size(without metaspace memory) may cause
container running beyond physical memory limit.
2. Is the native memory that rocksdb using part of direct memory(limit by
jvm parameter  *MaxDirectMemorySize*),if not how do i control the size it
used.


Re: Connecting to a service (Eg: Kafka, Cassandra) with different keytabs from the same cluster

2019-05-20 Thread Dawid Wysakowicz
Hi Harshith,

I haven't tried it, but for Kafka you should be able to use the dynamic
sasl configuration of the underlying KafkaConsumer. Try setting the
`sasl.jaas.config` parameter for the FlinkKafkaConsumer as per the Kafka
documentation.

As far as I know if you use a Flink's specific way of distributing
security contexts[1], you can provide only a single credentials for a
single Flink cluster.

Best,

Dawid


[1]
https://ci.apache.org/projects/flink/flink-docs-master/ops/security-kerberos.html#kerberos-authentication-setup-and-configuration

On 20/05/2019 10:15, Kumar Bolar, Harshith wrote:
>
> Hi all,
>
> We have a central Flink cluster which will be used by multiple
> different teams (Data Science, Engineering etc). Each team has their
> own user and keytab to connect to services like Kafka, Cassandra etc.
> How should the jobs be configured such that different jobs use
> different keytabs and principals to connect to Kafka?
>
> Right now with a single user, we have the following entry in the
> jaas.conffile.
>
> KafkaClient {
>
>    com.sun.security.auth.module.Krb5LoginModule required
>
>    useTicketCache=false
>
>    renewTicket=true
>
>    useKeyTab=true
>
>    keyTab="/opt/certs/flink-user.keytab"
>
>    serviceName="kafka"
>
>    principal="flink-u...@test.abc.com";
>
> };
>
> How can I add more users like this and reference them in the jobs
> separately?
>
> Thanks,
>
> Harshith
>


signature.asc
Description: OpenPGP digital signature


Re: Possilby very slow keyBy in program with no parallelism

2019-05-20 Thread Dawid Wysakowicz
Hi Theo,

Could you try replacing the CEP operator with a simple flatMap to see if
the CEP is the reason for the backpressure? Another reason for this
behavior might be the time of serialization (what is the serialization
format?) of the records. You could also try enabling object reuse[1].
Good idea would be also to try run the flink job under a profiler to see
the hot spots.

One more thing I noticed, that you have set the bounded out of orderness
this makes the CEP operator to accumulate events for 30 minutes (for the
sorting) before it is forwarded to the underlying state machine for
processing. Could you try decreasing this value?

I can't help with explanation how the network buffers work, but I cc
Piotr who should be more suited to provide meaningful information.

Regards,

Dawid


[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/execution_configuration.html#execution-configuration

On 19/05/2019 14:16, Theo Diefenthal wrote:
>
> Hi,
>
>  
>
> I wrote a small Flink program on a yarn cluster (128GB RAM, 8 core
> Xeon CPU for each node) essentially reading messages from kafka and
> applying a simple CEP rule on those messages. The program is expected
> to have a parallelism of 1 for input as my test kafka topic has only 1
> partition.
>
>  
>
> The program looks pretty much like this:
>
>  
>
> FlinkKafkaConsumer flinkKafkaConsumer = *new
> *FlinkKafkaConsumer<>(sourceTopicName, *deserializer*, properties);
> flinkKafkaConsumer.setStartFromGroupOffsets();
> flinkKafkaConsumer.setCommitOffsetsOnCheckpoints(*true*);
> flinkKafkaConsumer.assignTimestampsAndWatermarks(*new
> *BoundedOutOfOrdernessTimestampExtractor(Time./minutes/(30)) {
>     @Override
>     *public long *extractTimestamp(Databean element) {
>     *return *element.getTs();
>     }
> });
>
> SingleOutputStreamOperator kafkaSource = 
> *env*.addSource(flinkKafkaConsumer)
>
>  
>
> // Keying here is very slow???
>
> KeyedStream keyedStream = proxylogsStream.keyBy(bean
> -> bean.getUser());
>  
> Pattern cepPattern = Pattern. 
>    /begin/(*"firstStep"*, 
> AfterMatchSkipStrategy./skipPastLastEvent/()).where(*new *FirstFilter())  
>    .followedBy(*"secondStep"*).where(*new *SecondFilter())     
> .within(Time./minutes/(15));
> PatternStream patternMatchStream =
> CEP./pattern/(keyedStream, cepPattern);
> SingleOutputStreamOperator beanAlerts =
> patternMatchStream.select(*new *MatchToAlarmConverter()); 
> beanAlerts.addSink(*new *FlinkKafkaProducer<>(*config*.*kafkaAlertsTopic*, 
> *new *AlarmBeanSerializeSchema(), properties));
>
>  
>
> The applied CEP filter “FirstFilter” and “SecondFilter” are very
> simple rules like
>
> return “humidity”.equals(bean.getRecordedMetricType())
>
>  
>
> My Databean has round about 50 elements containing numbers and small
> strings (Up to 50 characters).
>
> Currently, I write 200.000 Elements into my Kafka topic every 5
> minutes. 100.000 of those have the same username, i.e. all have the
> name “empty”, and the other half are almost unique. (some random
> number between 1 and 1). The generated data timestamp randomly
> varies +-7.5 minutes between the generated timestamp (Generation time
> = time pushed into kafka).
>
> My CEP rule is written with conditions that never match, so the kafka
> sink as well as the stream select function can be eliminated as causes
> for the slow processing speeds.
>
>  
>
> I start the application via yarn with:
>
> *"${FLINK_HOME}/bin/yarn-session.sh" *-n 4 -jm 4096m -tm 65536m --name 
> *"TESTJOB" *-d
> *$*{FLINK_HOME}/bin/flink run -m *$*{FLINK_HOST} -d -n *"${JOB_JAR}" *$*
>  
>
> So the job has plenty of RAM available, but I didn’t note any
> difference in terms of speed when assigning 16G or 64G of RAM.  As
> expected, I have a single task manager and parallelism 1.
>
>  
>
> Now about my problem:
>
> Currently, the pipeline processes round about 150-300 elements per
> second. On startup, it peaks to 3000-4000 elements per second but
> slows down within one minute to 150-300 elements per second.
>
> I immediately expected CEP to be that slow (As this is my first CEP
> experiment), but I observed the following:
>
>  1. Even though CEP has quite some overhead (elements must be sorted
> on time), my rule is very simple and should, in my perspective,
> perform much better on that machine. My shot in the dark before
> was something like 10.000 – 100.000 elements/s.
>  2. None of my machine resources is fully utilized, i.e. none of the
> cluster CPU runs at 100% utilization (according to htop). And the
> memory is virtually available, but the RES column in htop states
> the processes uses 5499MB.
>  3. According to Flink GUI, the job is split into two tasks: First
> there is the source task, then there is a hash arrow to the second
> task (the keyBy?!) and the second is cep-pattern apply,convert and
> write to sink.  From the UI, I know that in task 1 there is a HIGH
> backpressure whereas tas

Re: Possilby very slow keyBy in program with no parallelism

2019-05-20 Thread Dawid Wysakowicz
Hi Theo,

Could you try replacing the CEP operator with a simple flatMap to see if
the CEP is the reason for the backpressure? Another reason for this
behavior might be the time of serialization (what is the serialization
format?) of the records. You could also try enabling object reuse[1].
Good idea would be also to try run the flink job under a profiler to see
the hot spots.

One more thing I noticed, that you have set the bounded out of orderness
this makes the CEP operator to accumulate events for 30 minutes (for the
sorting) before it is forwarded to the underlying state machine for
processing. Could you try decreasing this value?

I can't help with explanation how the network buffers work, but I cc
Piotr who should be more suited to provide meaningful information.

Regards,

Dawid


[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/execution_configuration.html#execution-configuration

On 19/05/2019 14:16, Theo Diefenthal wrote:
>
> Hi,
>
>  
>
> I wrote a small Flink program on a yarn cluster (128GB RAM, 8 core
> Xeon CPU for each node) essentially reading messages from kafka and
> applying a simple CEP rule on those messages. The program is expected
> to have a parallelism of 1 for input as my test kafka topic has only 1
> partition.
>
>  
>
> The program looks pretty much like this:
>
>  
>
> FlinkKafkaConsumer flinkKafkaConsumer = *new
> *FlinkKafkaConsumer<>(sourceTopicName, *deserializer*, properties);
> flinkKafkaConsumer.setStartFromGroupOffsets();
> flinkKafkaConsumer.setCommitOffsetsOnCheckpoints(*true*);
> flinkKafkaConsumer.assignTimestampsAndWatermarks(*new
> *BoundedOutOfOrdernessTimestampExtractor(Time./minutes/(30)) {
>     @Override
>     *public long *extractTimestamp(Databean element) {
>     *return *element.getTs();
>     }
> });
>
> SingleOutputStreamOperator kafkaSource = 
> *env*.addSource(flinkKafkaConsumer)
>
>  
>
> // Keying here is very slow???
>
> KeyedStream keyedStream = proxylogsStream.keyBy(bean
> -> bean.getUser());
>  
> Pattern cepPattern = Pattern. 
>    /begin/(*"firstStep"*, 
> AfterMatchSkipStrategy./skipPastLastEvent/()).where(*new *FirstFilter())  
>    .followedBy(*"secondStep"*).where(*new *SecondFilter())     
> .within(Time./minutes/(15));
> PatternStream patternMatchStream =
> CEP./pattern/(keyedStream, cepPattern);
> SingleOutputStreamOperator beanAlerts =
> patternMatchStream.select(*new *MatchToAlarmConverter()); 
> beanAlerts.addSink(*new *FlinkKafkaProducer<>(*config*.*kafkaAlertsTopic*, 
> *new *AlarmBeanSerializeSchema(), properties));
>
>  
>
> The applied CEP filter “FirstFilter” and “SecondFilter” are very
> simple rules like
>
> return “humidity”.equals(bean.getRecordedMetricType())
>
>  
>
> My Databean has round about 50 elements containing numbers and small
> strings (Up to 50 characters).
>
> Currently, I write 200.000 Elements into my Kafka topic every 5
> minutes. 100.000 of those have the same username, i.e. all have the
> name “empty”, and the other half are almost unique. (some random
> number between 1 and 1). The generated data timestamp randomly
> varies +-7.5 minutes between the generated timestamp (Generation time
> = time pushed into kafka).
>
> My CEP rule is written with conditions that never match, so the kafka
> sink as well as the stream select function can be eliminated as causes
> for the slow processing speeds.
>
>  
>
> I start the application via yarn with:
>
> *"${FLINK_HOME}/bin/yarn-session.sh" *-n 4 -jm 4096m -tm 65536m --name 
> *"TESTJOB" *-d
> *$*{FLINK_HOME}/bin/flink run -m *$*{FLINK_HOST} -d -n *"${JOB_JAR}" *$*
>  
>
> So the job has plenty of RAM available, but I didn’t note any
> difference in terms of speed when assigning 16G or 64G of RAM.  As
> expected, I have a single task manager and parallelism 1.
>
>  
>
> Now about my problem:
>
> Currently, the pipeline processes round about 150-300 elements per
> second. On startup, it peaks to 3000-4000 elements per second but
> slows down within one minute to 150-300 elements per second.
>
> I immediately expected CEP to be that slow (As this is my first CEP
> experiment), but I observed the following:
>
>  1. Even though CEP has quite some overhead (elements must be sorted
> on time), my rule is very simple and should, in my perspective,
> perform much better on that machine. My shot in the dark before
> was something like 10.000 – 100.000 elements/s.
>  2. None of my machine resources is fully utilized, i.e. none of the
> cluster CPU runs at 100% utilization (according to htop). And the
> memory is virtually available, but the RES column in htop states
> the processes uses 5499MB.
>  3. According to Flink GUI, the job is split into two tasks: First
> there is the source task, then there is a hash arrow to the second
> task (the keyBy?!) and the second is cep-pattern apply,convert and
> write to sink.  From the UI, I know that in task 1 there is a HIGH
> backpressure whereas tas

Re: Apache Flink - How to pass configuration params in the flink-config.yaml file to local execution environment

2019-05-20 Thread Dawid Wysakowicz
Hi,

You should be able to pass the Configuration via:

org.apache.flink.streaming.api.environment.StreamExecutionEnvironment#createLocalEnvironment(int,
org.apache.flink.configuration.Configuration)

Regards,

Dawid

On 19/05/2019 20:49, M Singh wrote:
> Hey Flink Folks:
>
> I was trying to find out how can pass the params in the
> flink-config.yaml file to a application running in local env.
>
> Can I create a flink-config.yaml file and include it in the class path
> ? Or can I pass the parameters via the parameter tool ?
>
> Please let me know if there is any documentation on this.
>
> Thanks.
>
>
>


signature.asc
Description: OpenPGP digital signature


Re: monitor finished files on a Continues Reader

2019-05-20 Thread Aljoscha Krettek
Hi,

I think what you’re trying to achieve is not possible with the out-of-box file 
source. The problem is that it is hard to know when a file can be deleted, i.e. 
there are multiple splits of a file and those are possibly read on different 
parallel operators. Plus, deletion/move of files has to happen after a 
checkpoint is confirmed, otherwise, the job might fail and would have to 
re-read those files.

You could get this working by implementing your own version of the continuous 
file monitor and file reader operators. You would have to ensure that one split 
always covers one complete file (maybe make your files small enough for that), 
then in the reader operator, after reading a split, you would store them in a 
list that you checkpoint. When a checkpoint is confirmed, i.e. in the 
notifyCheckpointComplete() method you can delete the files in that list.

I hope that helps.

Best,
Aljoscha

> On 20. May 2019, at 09:53, Hanan Yehudai  wrote:
> 
> Hi
> im looking for a way to delete / rename files that are done loading..
>  
> im using the env.readFile ,   monitoring a directory for all new files,  once 
> files are done with I would like to delete it.
> Is there a way to monitor the closed splits in the continues reader ?  is 
> there an different way to do this ?
>  
>  
> Regards,
> Hanan



RE: monitor finished files on a Continues Reader

2019-05-20 Thread Hanan Yehudai
It helps ! thank you  😊

From: Aljoscha Krettek 
Sent: 20 May 2019 12:45
To: Hanan Yehudai 
Cc: user@flink.apache.org
Subject: Re: monitor finished files on a Continues Reader

Hi,

I think what you’re trying to achieve is not possible with the out-of-box file 
source. The problem is that it is hard to know when a file can be deleted, i.e. 
there are multiple splits of a file and those are possibly read on different 
parallel operators. Plus, deletion/move of files has to happen after a 
checkpoint is confirmed, otherwise, the job might fail and would have to 
re-read those files.

You could get this working by implementing your own version of the continuous 
file monitor and file reader operators. You would have to ensure that one split 
always covers one complete file (maybe make your files small enough for that), 
then in the reader operator, after reading a split, you would store them in a 
list that you checkpoint. When a checkpoint is confirmed, i.e. in the 
notifyCheckpointComplete() method you can delete the files in that list.

I hope that helps.

Best,
Aljoscha


On 20. May 2019, at 09:53, Hanan Yehudai 
mailto:hanan.yehu...@radcom.com>> wrote:

Hi
im looking for a way to delete / rename files that are done loading..

im using the env.readFile ,   monitoring a directory for all new files,  once 
files are done with I would like to delete it.
Is there a way to monitor the closed splits in the continues reader ?  is there 
an different way to do this ?


Regards,
Hanan



Re: questions about yarn container(taskmanager)memory allocation

2019-05-20 Thread Xintong Song
Hi XiangWei,

Thank you for the inputs. I agree with you that it is possible that
containers may use extra memory in 1.8. As for native memory, it is memory
used by JVM and other processes outside JVM. So it's not limited by
MaxDirectMemorySize.

The community is working on a refactoring plan about resource management of
Flink. AFAIK, the native memory is on the table, but I'm not sure whether
the metaspace memory is considered. I think we should create a jira issue
on this.

Thank you~

Xintong Song



On Mon, May 20, 2019 at 4:47 PM XiangWei Huang 
wrote:

> Hi all,
> Currently i am running my flink application in yarn session mode and
> using below commnad :
> *bin/yarn-session.sh -d -s 3 -jm 1024 -tm 4096*
> when taskmanager complete to started,i found the container launching
> command is :
> * bin/java -Xms2765m -Xmx2765m -XX:MaxDirectMemorySize=1331m ..*
> from flink source code,i'v learned how direct memory size and heap memory
> size calculated :
> container total memory: 4096M
> network memory fraction: 0.1
> cut off memory fraction: 0.25
> networkMemory = 4096*0.1 = 409.6
> cut off memory = (4096 - 409.6) * 0.25 = 921.6
> *directMemorySize* = 409.6 + 921.6 = 1331.2
> *heapsize *= 4096 - 1331.2 = 2764.8
>
> Below is the environment i'm using to run yarn and flink:
> *  jdk version: 1.8*
> * flink version: 1.6.1*
> *OS: centos7*
>
> there are two questions about memory allocation i want to ask:
> 1. Since the jdk version i an using is 1.8,is it necessary to consider the 
> *metaspace
> memory *into calculation.According to the current way to calculate memory
> size(without metaspace memory) may cause
> container running beyond physical memory limit.
> 2. Is the native memory that rocksdb using part of direct memory(limit by
> jvm parameter  *MaxDirectMemorySize*),if not how do i control the size it
> used.
>


Enabling detailed inbound/outbound network queue metrics

2019-05-20 Thread Sergii Mykhalchuk
Hello guys,

We are trying to track network queues, after enabling 
"taskmanager.network.detailed-metrics” as described in the docs [1], we do not 
see any of *QueueLen metrics from here [2] in our metrics reporter (slf4j for 
debug purposes).

We are using Flink 1.8.0 at the moment.

Do you guys know if it is a known issue or not?

Thanks in advance.

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.8/ops/config.html#taskmanager-network-detailed-metrics
 

[2] 
https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/metrics.html#network
 



Sergii Mykhalchuk
Developer

Booking.com B.V.
Nieuwe Weteringstraat 36-38 Amsterdam 1017 ZX Netherlands
Direct +31207243491
 
Empowering people to experience the world since 1996
43 languages, 214+ offices worldwide, 141,000+ global destinations, 29 million 
reported listings 
Subsidiary of Booking Holdings Inc. (NASDAQ: BKNG)



Re: Re: Flink not giving full reason as to why job submission failed

2019-05-20 Thread Kumar Bolar, Harshith
Hi Wouter,

I’ve upgraded Flink to 1.8, but now I only see Internal server error on the 
dashboard when a job deployment fails.

[cid:image001.png@01D50F44.B76CA0C0]

But in the logs I see the correct exception -
Caused by: java.lang.ArrayIndexOutOfBoundsException: 1
  at 
functions.PersistPIDMessagesToCassandra.main(PersistPIDMessagesToCassandra.java:59)
  at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
  at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
  at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
  at java.lang.reflect.Method.invoke(Method.java:498)
  at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:529)
  ... 9 more

Is there any way to show these errors on the dashboard? Because many 
application teams deploy jobs through the dashboard and don’t have ready access 
to the logs.

Thanks,
Harshith

From: Wouter Zorgdrager 
Date: Thursday, 16 May 2019 at 7:56 PM
To: Harshith Kumar Bolar 
Cc: user 
Subject: [External] Re: Flink not giving full reason as to why job submission 
failed

Hi Harshith,

This was indeed an issue in 1.7.2, but fixed in 1.8.0. See the corresponding 
Jira issue [1].

Cheers,
Wouter

[1]: 
https://issues.apache.org/jira/browse/FLINK-11902

Op do 16 mei 2019 om 16:05 schreef Kumar Bolar, Harshith 
mailto:hk...@arity.com>>:
Hi all,

After upgrading Flink to 1.7.2, when I try to submit a job from the dashboard 
and there's some issue with the job, the job submission fails with the 
following error.

Exception occurred in REST handler: 
org.apache.flink.client.program.ProgramInvocationException: The main method 
caused an error.
There's no other reason given as to why the job failed to submit. This was not 
the case in 1.4.2. Is there a way to see the full reason why the job failed to 
deploy? I see the same error in the logs too with no additional information.
Thanks,
Harshith


Re: Re: Flink not giving full reason as to why job submission failed

2019-05-20 Thread Wouter Zorgdrager
Hi Harshith,

This is indeed an issue not resolved in 1.8. I added a comment to the
(closed) Jira issue, so this might be fixed in further releases.

Cheers,
Wouter

Op ma 20 mei 2019 om 16:18 schreef Kumar Bolar, Harshith :

> Hi Wouter,
>
>
>
> I’ve upgraded Flink to 1.8, but now I only see Internal server error on
> the dashboard when a job deployment fails.
>
>
>
>
>
> But in the logs I see the correct exception -
> Caused by: java.lang.ArrayIndexOutOfBoundsException: 1
>
>   at
> functions.PersistPIDMessagesToCassandra.main(PersistPIDMessagesToCassandra.java:59)
>
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>
>   at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>
>   at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>
>   at java.lang.reflect.Method.invoke(Method.java:498)
>
>   at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:529)
>
>   ... 9 more
>
>
>
> Is there any way to show these errors on the dashboard? Because many
> application teams deploy jobs through the dashboard and don’t have ready
> access to the logs.
>
>
>
> Thanks,
>
> Harshith
>
>
>
> *From: *Wouter Zorgdrager 
> *Date: *Thursday, 16 May 2019 at 7:56 PM
> *To: *Harshith Kumar Bolar 
> *Cc: *user 
> *Subject: *[External] Re: Flink not giving full reason as to why job
> submission failed
>
>
>
> Hi Harshith,
>
>
>
> This was indeed an issue in 1.7.2, but fixed in 1.8.0. See the
> corresponding Jira issue [1].
>
>
>
> Cheers,
>
> Wouter
>
>
>
> [1]: https://issues.apache.org/jira/browse/FLINK-11902
> 
>
>
>
>
> Op do 16 mei 2019 om 16:05 schreef Kumar Bolar, Harshith  >:
>
> Hi all,
>
>
>
> After upgrading Flink to 1.7.2, when I try to submit a job from the
> dashboard and there's some issue with the job, the job submission fails
> with the following error.
>
>
>
> Exception occurred in REST handler:
> org.apache.flink.client.program.ProgramInvocationException: The main method
> caused an error.
>
> There's no other reason given as to why the job failed to submit. This was
> not the case in 1.4.2. Is there a way to see the full reason why the job
> failed to deploy? I see the same error in the logs too with no additional
> information.
>
> Thanks,
>
> Harshith
>
>


Re: Table program cannot be compiled

2019-05-20 Thread Timo Walther

Hi Shahar,

yes the number of parameters should be the issue for a cannot compile 
exception. If you moved most of the constants to a member in the 
function, it should actually work.


Do you have a little reproducible example somewhere?

Thanks,
Timo



Am 16.05.19 um 19:59 schrieb shkob1:

Hi Timo,

Thanks for the link.
Not sure i understand your suggestion though, is the goal here reducing the
amount of parameters coming to the UDF? if thats the case i can maybe have
the tag names there, but still need the expressions to get evaluated before
entering the eval. Do you see this in a different way?

I tried moving the tag names to be a member within the function instead of a
parameter, but apparently i still have too many arguments.

Let me know if this is not what you meant.

Thanks!
Shahar



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/





Re: Generic return type on a user-defined scalar function

2019-05-20 Thread Timo Walther

Hi Morrisa,

usually, this means that you class is not recognized as a POJO. Please 
check again the requirements of a POJO: Default constructor, getters and 
setters for every field etc. You can use 
org.apache.flink.api.common.typeinfo.Types.POJO(...) to verify if your 
class is a POJO or not.


I hope this helps.

Regards,
Timo


Am 16.05.19 um 23:18 schrieb Morrisa Brenner:


Hi Flink folks,


In a Flink job using the SQL API that I’m working on, I have a custom 
POJO data type with a generic field, and I would like to be able to 
call a user-defined function on this field.I included a similar 
function below with the business logic stubbed out, but this example 
has the return type I'm looking for.



I have no issues using custom functions of this type when they're used 
in a select statement and the `getResultType` method is excluded from 
the user-defined function class, but I am unable to get the type 
information to resolve correctly in contexts like order by and group 
by statements. It still doesn't work even if the `getResultType` 
method defines the specific type for a given object explicitly because 
the job compiler within Flink seems to be assuming the return type 
from the `eval` method is just an Object (type erasure...), and it 
fails to generate the object code because it's detecting invalid casts 
to the desired output type. Without the `getResultType` method, it 
just fails to detect type entirely. This seems to be fine when it's 
just a select, but if I try to make it do any operation (like group 
by) I get the following error: 
"org.apache.flink.api.common.InvalidProgramException: This type 
(GenericType) cannot be used as key."



Does anyone know if there's a way to get Flink to pay attention to the 
type information from `getResultType` when compiling the `eval` method 
so that the types work out? Or another way to work around the type 
erasure on the eval method without defining explicit user-defined 
function classes for each type?



Thanks for your help!


Morrisa




Code snippet:



package flink_generics_testing;


import org.apache.flink.api.common.typeinfo.TypeInformation;

import org.apache.flink.table.functions.ScalarFunction;


/**

* Reads custom values from a table and performs a function on those 
values.


* T should be able to be a String, long, float, boolean, or Date

*

* @param  The expected type of the table column values.

*/

public class CustomScalarFunction extends ScalarFunction {


private static final long serialVersionUID = -5537657771138360838L;


  private final Class desiredType;


/**

   * Construct an instance.

   *

   * @param desiredType The type of the value that we're performing 
the function on.


   */

public CustomScalarFunction(Class desiredType) {

this.desiredType = desiredType;

}

public T eval(T value) {

return value;

}


@Override

public TypeInformation getResultType(Class[] signature) {

return TypeInformation.of(desiredType);

}


@Override

public TypeInformation[] getParameterTypes(Class[] signature) {

return new TypeInformation[]{

  TypeInformation.of(desiredType)

  };

}

}



--
Morrisa Brenner
Software Engineer

225 Franklin St, Boston, MA 02110
klaviyo.com 
Klaviyo Logo





[ANNOUNCE] Seattle Flink Meetup at AWS on May 30

2019-05-20 Thread Bowen Li
Hi Greater Seattle folks!

We are hosting our next meetup with AWS Kinesis Analytics team on May 30
next Thursday in downtown Seattle.

We feature two talks this time:

   1. *"AWS Kinesis Analytics: running Flink serverless in multi-tenant
   environment"* by Kinesis Analytics team on:
  - How to run Apache Flink applications using Amazon Kinesis Data
  Analytics service
  - Challenges they faced building a multi-tenant serverless Flink
  service
   2. *"How to contribute to Flink: a hands-on guide to start your Apache
   open-source journey"* by me to get developers onboard given the
   considerably rising interest of contributing to Flink from local tech
   community

Please find details and RSVP at
https://www.meetup.com/seattle-flink/events/260865206/  See you next week!

Cheers,
Bowen


Flink vs KStreams

2019-05-20 Thread Peter Groesbeck
Hi folks,

I'm hoping to get some deeper clarification on which framework, Flink or
KStreams, to use in a given scenario. I've read over the following blog
article which I think sets a great baseline understanding of the
differences between those frameworks but I would like to get some outside
opinions:
https://www.confluent.io/blog/apache-flink-apache-kafka-streams-comparison-guideline-users/

My understanding of this article is that KStreams works well as an embedded
library in a microservice, API layer, or as a standalone application at a
company with centralized deployment infrastructure, such as a shared
Kubernetes cluster.

In this case, there is discussion around deploying KStreams as a standalone
application stack backed by EC2 or ECS, and whether or not Flink is better
suited to serve as the data transformation layer. We already do run Flink
applications on EMR.

The point against Flink is that it requires a cluster whereas KStreams does
not, and can be deployed on ECS or EC2. We do not have a centralized
deployment team, and will still have to maintain either the CNF
Stack/AutoScaling Group or EMR Cluster ourselves.

What are some of the advantages of using Flink over KStreams standalone?

The Job management UI is one that comes to mind, and another are some of
the more advanced API options such as CEP. But I would really love to hear
the opinions of people who are familiar with both. In what scenarios would
you choose one over the other? Is it advisable or even preferable to
attempt to deploy KStreams as it's own stack and avoid the complexity of
maintaining a cluster?

Thanks,
Peter


Flink cluster log organization

2019-05-20 Thread Soheil Pourbafrani
Hi,

I have a Flink multinode cluster and I use Flink standalone scheduler to
deploy applications on the cluster. When I deploy applications on the
cluster I can see some log files on the path FLINK_HOME/logs will be
created but there is no separate log file for each application and all
application logs are merged into files. I have no idea how Flink organize
logs on standalone mode. I was wondering if it's possible to have a
separate log file for each application? or for example set the log file
name and path when I submit the applications on the cluster?


Re: Flink vs KStreams

2019-05-20 Thread Timothy Victor
This is probably a very subjective question, but nevertheless here are my
reasons for choosing Flink over KStreams or even Spark.

a) KStreams couples you tightly to Kafka, and I personally don't want my
stream processing engine to be married to my message bus.   There are other
(even better alternatives) to Kafka given your requirements, so I thought
it best to keep these two separate.

b) Fault Tolerance on KStreams relies on intermittent Kafka topics.  The
increased network and I/O for messages to/from Kafka for bookkeeping was to
high for my use cases.   The concept of local state and asynchronous
barrier snapshotting are key principles as to why Flink does better in this
area.   You can't just bolt that on to a framework that didnt get those
concepts right from the beginning.

c) Lack of fine grained control over operator parallelism.  A graph may
have different computation hot spots where it maybe worth fanning out to
process over multiple nodes in parallel.  In KStreams as well as Spark you
can't really do that.  In Kstreams the parallelism is tightly bound to the
number of partitions.  This means that your entire processing graph needs
to align well with the way you've partitioned your data in order to
maximize throughput.  You cant (or maybe even don't) want to do that.

d)  The API lends itself well to describing DAGs.   I found Spark and
KStreams not so intuitive in this regard.

HTH

Tim

On Mon, May 20, 2019, 5:30 PM Peter Groesbeck 
wrote:

> Hi folks,
>
> I'm hoping to get some deeper clarification on which framework, Flink or
> KStreams, to use in a given scenario. I've read over the following blog
> article which I think sets a great baseline understanding of the
> differences between those frameworks but I would like to get some outside
> opinions:
>
> https://www.confluent.io/blog/apache-flink-apache-kafka-streams-comparison-guideline-users/
>
> My understanding of this article is that KStreams works well as an
> embedded library in a microservice, API layer, or as a standalone
> application at a company with centralized deployment infrastructure, such
> as a shared Kubernetes cluster.
>
> In this case, there is discussion around deploying KStreams as a
> standalone application stack backed by EC2 or ECS, and whether or not Flink
> is better suited to serve as the data transformation layer. We already do
> run Flink applications on EMR.
>
> The point against Flink is that it requires a cluster whereas KStreams
> does not, and can be deployed on ECS or EC2. We do not have a centralized
> deployment team, and will still have to maintain either the CNF
> Stack/AutoScaling Group or EMR Cluster ourselves.
>
> What are some of the advantages of using Flink over KStreams standalone?
>
> The Job management UI is one that comes to mind, and another are some of
> the more advanced API options such as CEP. But I would really love to hear
> the opinions of people who are familiar with both. In what scenarios would
> you choose one over the other? Is it advisable or even preferable to
> attempt to deploy KStreams as it's own stack and avoid the complexity of
> maintaining a cluster?
>
> Thanks,
> Peter
>


Re: Flink cluster log organization

2019-05-20 Thread Xintong Song
Hi Soheil,

Do you mean you are running multiple jobs in the same Flink standalone
cluster? Or running multiple Flink standalone clusters on the same set of
machines?

For the former, I don't think there is an easy way to separate the logs.
The log file contains logs of common framework activities that do not
belong to any particular job.

For the latter, there are two ways that I can think of to specify log file
path for a Flink cluster.

   - Set evn.log.dir in flink-conf.yaml
   - Modify log4j.appender.file.file in log4j.properties

Hope that helps.

Thank you~

Xintong Song



On Tue, May 21, 2019 at 7:03 AM Soheil Pourbafrani 
wrote:

> Hi,
>
> I have a Flink multinode cluster and I use Flink standalone scheduler to
> deploy applications on the cluster. When I deploy applications on the
> cluster I can see some log files on the path FLINK_HOME/logs will be
> created but there is no separate log file for each application and all
> application logs are merged into files. I have no idea how Flink organize
> logs on standalone mode. I was wondering if it's possible to have a
> separate log file for each application? or for example set the log file
> name and path when I submit the applications on the cluster?
>


Queryable State race condition or serialization errors?

2019-05-20 Thread burgesschen
Hi Guys,

I observed some strange behaviors while using Queryable state with Flink
1.6.2. Here is the story:

My state is of type MapState[String, Map[String, String]]. the inner map is
frequently updated. Upon querying, sometimes the returned inner map can miss
some fields. What's more, sometimes the returned inner map has the values
assigned to other keys! 

Changing the type to MapState[String, String] seem to solve the problem.

The code is a little too deep to dig into. But my guess is that when the
state is being updated and queried at the same time, there can be a race
condition and cause data corruption. Please let me know if you have a better
idea what could be happening. Much appreciated!

Best,
Burgess Chen  



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Generic return type on a user-defined scalar function

2019-05-20 Thread JingsongLee
Hi Morrisa:

It seems that flink planner not support return Object(or generic, like you say, 
type erasure) in ScalarFunction.
In ScalarFunctionCallGen:
val functionCallCode =
  s"""
|${parameters.map(_.code).mkString("\n")}
|$resultTypeTerm $resultTerm = $functionReference.eval(
|  ${parameters.map(_.resultTerm).mkString(", ")});
|""".stripMargin
There should be a coercive transformation to eval return value to support this 
situation.
I have no ideas to bypass it. If you can modify the source code, you can change 
it to this way to support generic return type:
val functionCallCode =
  s"""
|${parameters.map(_.code).mkString("\n")}
|$resultTypeTerm $resultTerm = ($resultTypeTerm) $functionReference.eval(
|  ${parameters.map(_.resultTerm).mkString(", ")});
|""".stripMargin

Best, JingsongLee


--
From:Timo Walther 
Send Time:2019年5月20日(星期一) 23:03
To:user 
Subject:Re: Generic return type on a user-defined scalar function

 
Hi Morrisa,

usually, this means that you class is not recognized as a POJO. Please check 
again the requirements of a POJO: Default constructor, getters and setters for 
every field etc. You can use 
org.apache.flink.api.common.typeinfo.Types.POJO(...) to verify if your class is 
a POJO or not.

I hope this helps.

Regards,
Timo


Am 16.05.19 um 23:18 schrieb Morrisa Brenner:


Hi Flink folks, 
In a Flink job using the SQL API that I’m working on, I have a custom POJO data 
type with a generic field, and I would like to be able to call a user-defined 
function on this field. I included a similar function below with the business 
logic stubbed out, but this example has the return type I'm looking for. 
I have no issues using custom functions of this type when they're used in a 
select statement and the `getResultType` method is excluded from the 
user-defined function class, but I am unable to get the type information to 
resolve correctly in contexts like order by and group by statements. It still 
doesn't work even if the `getResultType` method defines the specific type for a 
given object explicitly because the job compiler within Flink seems to be 
assuming the return type from the `eval` method is just an Object (type 
erasure...), and it fails to generate the object code because it's detecting 
invalid casts to the desired output type. Without the `getResultType` method, 
it just fails to detect type entirely. This seems to be fine when it's just a 
select, but if I try to make it do any operation (like group by) I get the 
following error: "org.apache.flink.api.common.InvalidProgramException: This 
type (GenericType) cannot be used as key." 
Does anyone know if there's a way to get Flink to pay attention to the type 
information from `getResultType` when compiling the `eval` method so that the 
types work out? Or another way to work around the type erasure on the eval 
method without defining explicit user-defined function classes for each type? 
Thanks for your help! 
Morrisa 


Code snippet: 

package flink_generics_testing; 
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.table.functions.ScalarFunction; 
/**
* Reads custom values from a table and performs a function on those values.
* T should be able to be a String, long, float, boolean, or Date
*
* @param  The expected type of the table column values.
*/
public class CustomScalarFunction extends ScalarFunction { 
 private static final long serialVersionUID = -5537657771138360838L; 
   private final Class desiredType; 
 /**
* Construct an instance.
*
* @param desiredType The type of the value that we're performing the 
function on.
*/
 public CustomScalarFunction(Class desiredType) {
 this.desiredType = desiredType;
 }
 
 public T eval(T value) {
 return value;
 } 
 @Override
 public TypeInformation getResultType(Class[] signature) {
 return TypeInformation.of(desiredType);
 } 
 @Override
 public TypeInformation[] getParameterTypes(Class[] signature) {
 return new TypeInformation[]{
   TypeInformation.of(desiredType)
   };
 }
} 

 -- 



Morrisa Brenner
Software Engineer 



 225 Franklin St, Boston, MA 02110
klaviyo.com 
[Klaviyo
  Logo]