Re: OVER operator filtering out records

2019-08-23 Thread Vinod Mehra
Although things improved during bootstrapping and when even volume was
larger. As soon as the traffic slowed down the events are getting stuck
(buffered?) at the OVER operator for a very long time. Several hours.

On Fri, Aug 23, 2019 at 5:04 PM Vinod Mehra  wrote:

> (Forgot to mention that we are using Flink 1.4)
>
> Update: Earlier the OVER operator was assigned a parallelism of 64. I
> reduced it to 1 and the problem went away! Now the OVER operator is not
> filtering/buffering the events anymore.
>
> Can someone explain this please?
>
> Thanks,
> Vinod
>
> On Fri, Aug 23, 2019 at 3:09 PM Vinod Mehra  wrote:
>
>> We have a SQL based flink job which is consume a very low volume stream
>> (1 or 2 events in few hours):
>>
>>
>>
>>
>>
>>
>> *SELECT user_id,COUNT(*) OVER (PARTITION BY user_id ORDER BY rowtime
>> RANGE INTERVAL '30' DAY PRECEDING) as count_30_days,
>> COALESCE(occurred_at, logged_at) AS latency_marker,rowtimeFROM
>> event_fooWHERE user_id IS NOT NULL*
>>
>> The OVER operator seems to filter out events as per the flink dashboard
>> (records received =  records sent = 0)
>>
>> The operator looks like this:
>>
>> *over: (PARTITION BY: $1, ORDER BY: rowtime, RANGEBETWEEN 259200
>> PRECEDING AND CURRENT ROW, select: (rowtime, $1, $2, COUNT(*) AS w0$o0)) ->
>> select: ($1 AS user_id, w0$o0 AS count_30_days, $2 AS latency_marker,
>> rowtime) -> to: Tuple2 -> Filter -> group_counter_count_30d.1.sqlRecords ->
>> sample_without_formatter*
>>
>> I know that the OVER operator can discard late arriving events, but these
>> events are not arriving late for sure. The watermark for all operators stay
>> at 0 because the output events is 0.
>>
>> We have an exactly same SQL job against a high volume stream that is
>> working fine. Watermarks progress in timely manner and events are delivered
>> in timely manner as well.
>>
>> Any idea what could be going wrong? Are the events getting buffered
>> waiting for certain number of events? If so, what is the threshold?
>>
>> Thanks,
>> Vinod
>>
>


Re: Problem with Flink on Yarn

2019-08-23 Thread Rong Rong
This seems like your Kerberos server is starting to issue invalid token to
your job manager.
Can you share how your Kerberos setting is configured? This might also
relate to how your KDC servers are configured.

--
Rong

On Fri, Aug 23, 2019 at 7:00 AM Zhu Zhu  wrote:

> Hi Juan,
>
> Have you tried Flink release built with Hadoop 2.7 or later version?
> If you are using Flink 1.8/1.9, it should be Pre-bundled Hadoop 2.7+ jar
> which can be found in the Flink download page.
>
> I think YARN-3103 is about AMRMClientImp.class and it is in the flink
> shaded hadoop jar.
>
> Thanks,
> Zhu Zhu
>
> Juan Gentile  于2019年8月23日周五 下午7:48写道:
>
>> Hello!
>>
>>
>>
>> We are running Flink on Yarn and we are currently getting the following
>> error:
>>
>>
>>
>> *2019-08-23 06:11:01,534 WARN
>> org.apache.hadoop.security.UserGroupInformation   -
>> PriviledgedActionException as: (auth:KERBEROS)
>> cause:org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.security.token.SecretManager$InvalidToken):
>> Invalid AMRMToken from appattempt_1564713228886_5299648_01*
>>
>> *2019-08-23 06:11:01,535 WARN
>> org.apache.hadoop.ipc.Client  - Exception
>> encountered while connecting to the server :
>> org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.security.token.SecretManager$InvalidToken):
>> Invalid AMRMToken from appattempt_1564713228886_5299648_01*
>>
>> *2019-08-23 06:11:01,536 WARN
>> org.apache.hadoop.security.UserGroupInformation   -
>> PriviledgedActionException as:  (auth:KERBEROS)
>> cause:org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.security.token.SecretManager$InvalidToken):
>> Invalid AMRMToken from appattempt_1564713228886_5299648_01*
>>
>> *2019-08-23 06:11:01,581 WARN
>> org.apache.hadoop.io.retry.RetryInvocationHandler - Exception
>> while invoking ApplicationMasterProtocolPBClientImpl.allocate over rm0. Not
>> retrying because Invalid or Cancelled Token*
>>
>> *org.apache.hadoop.security.token.SecretManager$InvalidToken: Invalid
>> AMRMToken from appattempt_1564713228886_5299648_01*
>>
>> *at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native
>> Method)*
>>
>> *at
>> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)*
>>
>> *at
>> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)*
>>
>> *at java.lang.reflect.Constructor.newInstance(Constructor.java:423)*
>>
>> *at
>> org.apache.hadoop.yarn.ipc.RPCUtil.instantiateException(RPCUtil.java:53)*
>>
>> *at
>> org.apache.hadoop.yarn.ipc.RPCUtil.unwrapAndThrowException(RPCUtil.java:104)*
>>
>> *at
>> org.apache.hadoop.yarn.api.impl.pb.client.ApplicationMasterProtocolPBClientImpl.allocate(ApplicationMasterProtocolPBClientImpl.java:79)*
>>
>> *at sun.reflect.GeneratedMethodAccessor37.invoke(Unknown Source)*
>>
>> *at
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)*
>>
>> *at java.lang.reflect.Method.invoke(Method.java:498)*
>>
>> *at
>> org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:288)*
>>
>> *at
>> org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:206)*
>>
>> *at
>> org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:188)*
>>
>> *at com.sun.proxy.$Proxy26.allocate(Unknown Source)*
>>
>> *at
>> org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl.allocate(AMRMClientImpl.java:277)*
>>
>> *at
>> org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl$HeartbeatThread.run(AMRMClientAsyncImpl.java:224)*
>>
>> *Caused by:
>> org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.security.token.SecretManager$InvalidToken):
>> Invalid AMRMToken from appattempt_1564713228886_5299648_01*
>>
>> *at org.apache.hadoop.ipc.Client.call(Client.java:1472)*
>>
>> *at org.apache.hadoop.ipc.Client.call(Client.java:1409)*
>>
>> *at
>> org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:231)*
>>
>> *at com.sun.proxy.$Proxy25.allocate(Unknown Source)*
>>
>> *at
>> org.apache.hadoop.yarn.api.impl.pb.client.ApplicationMasterProtocolPBClientImpl.allocate(ApplicationMasterProtocolPBClientImpl.java:77)*
>>
>> *... 9 more*
>>
>>
>>
>> The Flink cluster runs ok for a while but then after a day we get this
>> error again. We haven’t made changes to our code so that’s why it’s hard to
>> understand why all of a sudden we started to see this.
>>
>>
>>
>> We found this issue reported on Yarn
>> https://issues.apache.org/jira/browse/YARN-3103 but our version of Yarn
>> already has that fix.
>>
>>
>>
>> Any help will be appreciated.
>>
>>
>>
>> Thank you,
>>
>> Juan
>>
>


Re: [ANNOUNCE] Apache Flink 1.9.0 released

2019-08-23 Thread Gavin Lee
Got it.
Thanks Till & Zili.
+1 for the release notes need to cover such issues.


On Fri, Aug 23, 2019 at 11:01 PM Oytun Tez  wrote:

> Hi all,
>
> We also had to rollback our upgrade effort for 2 reasons:
>
> - Official Docker container is not ready yet
> - This artefact is not published with
> scala: org.apache.flink:flink-queryable-state-client-java_2.11:jar:1.9.0
>
>
>
>
>
>
> ---
> Oytun Tez
>
> *M O T A W O R D*
> The World's Fastest Human Translation Platform.
> oy...@motaword.com — www.motaword.com
>
>
> On Fri, Aug 23, 2019 at 10:48 AM Zili Chen  wrote:
>
>> Hi Till,
>>
>> Did we mention this in release note(or maybe previous release note where
>> we did the exclusion)?
>>
>> Best,
>> tison.
>>
>>
>> Till Rohrmann  于2019年8月23日周五 下午10:28写道:
>>
>>> Hi Gavin,
>>>
>>> if I'm not mistaken, then the community excluded the Scala FlinkShell
>>> since a couple of versions for Scala 2.12. The problem seems to be that
>>> some of the tests failed. See here [1] for more information.
>>>
>>> [1] https://issues.apache.org/jira/browse/FLINK-10911
>>>
>>> Cheers,
>>> Till
>>>
>>> On Fri, Aug 23, 2019 at 11:44 AM Gavin Lee  wrote:
>>>
 I used package on apache official site, with mirror [1], the difference
 is
 I used scala 2.12 version.
 I also tried to build from source for both scala 2.11 and 2.12, when
 build
 2.12 the FlinkShell.class is in flink-dist jar file but after running
 mvn
 clean package -Dscala-2.12, this class was removed in
 flink-dist_2.12-1.9 jar
 file.
 Seems broken here for scala 2.12, right?

 [1]

 http://mirror.bit.edu.cn/apache/flink/flink-1.9.0/flink-1.9.0-bin-scala_2.12.tgz

 On Fri, Aug 23, 2019 at 4:37 PM Zili Chen  wrote:

 > I downloaded 1.9.0 dist from here[1] and didn't see the issue. Where
 do you
 > download it? Could you try to download the dist from [1] and see
 whether
 > the problem last?
 >
 > Best,
 > tison.
 >
 > [1]
 >
 http://mirrors.tuna.tsinghua.edu.cn/apache/flink/flink-1.9.0/flink-1.9.0-bin-scala_2.11.tgz
 >
 >
 > Gavin Lee  于2019年8月23日周五 下午4:34写道:
 >
 >> Thanks for your reply @Zili.
 >> I'm afraid it's not the same issue.
 >> I found that the FlinkShell.class was not included in flink dist jar
 file
 >> in 1.9.0 version.
 >> Nowhere can find this class file inside jar, either in opt or lib
 >> directory under root folder of flink distribution.
 >>
 >>
 >> On Fri, Aug 23, 2019 at 4:10 PM Zili Chen 
 wrote:
 >>
 >>> Hi Gavin,
 >>>
 >>> I also find a problem in shell if the directory contain whitespace
 >>> then the final command to run is incorrect. Could you check the
 >>> final command to be executed?
 >>>
 >>> FYI, here is the ticket[1].
 >>>
 >>> Best,
 >>> tison.
 >>>
 >>> [1] https://issues.apache.org/jira/browse/FLINK-13827
 >>>
 >>>
 >>> Gavin Lee  于2019年8月23日周五 下午3:36写道:
 >>>
  Why bin/start-scala-shell.sh local return following error?
 
  bin/start-scala-shell.sh local
 
  Error: Could not find or load main class
  org.apache.flink.api.scala.FlinkShell
  For flink 1.8.1 and previous ones, no such issues.
 
  On Fri, Aug 23, 2019 at 2:05 PM qi luo  wrote:
 
 > Congratulations and thanks for the hard work!
 >
 > Qi
 >
 > On Aug 22, 2019, at 8:03 PM, Tzu-Li (Gordon) Tai <
 tzuli...@apache.org>
 > wrote:
 >
 > The Apache Flink community is very happy to announce the release
 of
 > Apache Flink 1.9.0, which is the latest major release.
 >
 > Apache Flink® is an open-source stream processing framework for
 > distributed, high-performing, always-available, and accurate data
 streaming
 > applications.
 >
 > The release is available for download at:
 > https://flink.apache.org/downloads.html
 >
 > Please check out the release blog post for an overview of the
 > improvements for this new major release:
 > https://flink.apache.org/news/2019/08/22/release-1.9.0.html
 >
 > The full release notes are available in Jira:
 >
 >
 https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522&version=12344601
 >
 > We would like to thank all contributors of the Apache Flink
 community
 > who made this release possible!
 >
 > Cheers,
 > Gordon
 >
 >
 >
 
  --
  Gavin
 
 >>>
 >>
 >> --
 >> Gavin
 >>
 >

 --
 Gavin

>>>

-- 
Gavin


Re: Per Partition Watermarking source idleness

2019-08-23 Thread Eduardo Winpenny Tejedor
Hi Prakhar,

Everything is probably working as expected, if a partition does not receive
any messages then the watermark of the operator does not advance (as it is
the minimum across all partitions).

You'll need to define a strategy for the watermark to advance even when no
messages are received for a particular partition.

Regards,
Eduardo


On Fri, 23 Aug 2019, 10:35 Prakhar Mathur,  wrote:

> Hi,
>
> We are using flink v1.6. We are facing data loss issues while consuming
> data from older offsets in Kafka with windowing. We are exploring per
> partition watermarking strategy. But we noticed that when we are trying to
> consume from multiple topics and if any of the partition is not receiving
> data it just blocks everything. Do we have a known solution for this?
>


Re: OVER operator filtering out records

2019-08-23 Thread Vinod Mehra
(Forgot to mention that we are using Flink 1.4)

Update: Earlier the OVER operator was assigned a parallelism of 64. I
reduced it to 1 and the problem went away! Now the OVER operator is not
filtering/buffering the events anymore.

Can someone explain this please?

Thanks,
Vinod

On Fri, Aug 23, 2019 at 3:09 PM Vinod Mehra  wrote:

> We have a SQL based flink job which is consume a very low volume stream (1
> or 2 events in few hours):
>
>
>
>
>
>
> *SELECT user_id,COUNT(*) OVER (PARTITION BY user_id ORDER BY rowtime
> RANGE INTERVAL '30' DAY PRECEDING) as count_30_days,
> COALESCE(occurred_at, logged_at) AS latency_marker,rowtimeFROM
> event_fooWHERE user_id IS NOT NULL*
>
> The OVER operator seems to filter out events as per the flink dashboard
> (records received =  records sent = 0)
>
> The operator looks like this:
>
> *over: (PARTITION BY: $1, ORDER BY: rowtime, RANGEBETWEEN 259200
> PRECEDING AND CURRENT ROW, select: (rowtime, $1, $2, COUNT(*) AS w0$o0)) ->
> select: ($1 AS user_id, w0$o0 AS count_30_days, $2 AS latency_marker,
> rowtime) -> to: Tuple2 -> Filter -> group_counter_count_30d.1.sqlRecords ->
> sample_without_formatter*
>
> I know that the OVER operator can discard late arriving events, but these
> events are not arriving late for sure. The watermark for all operators stay
> at 0 because the output events is 0.
>
> We have an exactly same SQL job against a high volume stream that is
> working fine. Watermarks progress in timely manner and events are delivered
> in timely manner as well.
>
> Any idea what could be going wrong? Are the events getting buffered
> waiting for certain number of events? If so, what is the threshold?
>
> Thanks,
> Vinod
>


Re: [SURVEY] How do you use high-availability services in Flink?

2019-08-23 Thread Aleksandar Mastilovic
Hi all,

Since I’m currently working on an implementation of 
HighAvailabilityServicesFactory I thought it would be good to report here about 
my experience so far.

Our use case is cloud based, where we package Flink and our supplementary code 
into a docker image, then run those images through Kubernetes+Helm 
orchestration.

We don’t use Hadoop nor HDFS but rather Google Cloud Storage, and we don’t run 
ZooKeepers. Our Flink setup consists of one JobManager and multiple 
TaskManagers on-demand.

Due to the nature of cloud computing there’s a possibility our JobManager 
instance might go down, only to be automatically recreated through Kubernetes. 
Since we don’t run ZooKeeper
We needed a way to run a variant of High Availability cluster where we would 
keep JobManager information on our attached persistent k8s volume instead of 
ZooKeeper.
We found this 
(https://stackoverflow.com/questions/52104759/apache-flink-on-kubernetes-resume-job-if-jobmanager-crashes/52112538
 
)
 post on StackOverflow and decided to give it a try.

So far we have a setup that seems to be working on our local deployment, we 
haven’t yet tried it in the actual cloud.

As far as implementation goes, here’s what we did:

We used MapDB (mapdb.org ) as our storage format, to persist 
lists of objects onto disk. We partially relied on StandaloneHaServices for our 
HaServices implementation. Otherwise we looked at the ZooKeeperHaServices and 
related classes for inspiration and guidance.

Here’s a list of new classes:

FileSystemCheckpointIDCounter implements CheckpointIDCounter
FileSystemCheckpointRecoveryFactory implements CheckpointRecoveryFactory
FileSystemCompletedCheckpointStore implements CompletedCheckpointStore
FileSystemHaServices extends StandaloneHaServices
FileSystemHaServicesFactory implements HighAvailabilityServicesFactory
FileSystemSubmittedJobGraphStore implements SubmittedJobGraphStore

Testing so far proved that bringing down a JobManager and bringing it back up 
does indeed restore all the running jobs. Job creation/destruction also works. 

Hope this helps!

Thanks,
Aleksandar Mastilovic

> On Aug 21, 2019, at 12:32 AM, Zili Chen  wrote:
> 
> Hi guys,
> 
> We want to have an accurate idea of how users actually use 
> high-availability services in Flink, especially how you customize
> high-availability services by HighAvailabilityServicesFactory.
> 
> Basically there are standalone impl., zookeeper impl., embedded impl.
> used in MiniCluster, YARN impl. not yet implemented, and a gate to
> customized implementations.
> 
> Generally I think standalone impl. and zookeeper impl. are the most
> widely used implementations. The embedded impl. is used without
> awareness when users run a MiniCluster.
> 
> Besides that, it is helpful to know how you guys customize 
> high-availability services using HighAvailabilityServicesFactory 
> interface for the ongoing FLINK-10333[1] which would evolve 
> high-availability services in Flink. As well as whether there is any
> user take interest in the not yet implemented YARN impl..
> 
> Any user case should be helpful. I really appreciate your time and your
> insight.
> 
> Best,
> tison.
> 
> [1] https://issues.apache.org/jira/browse/FLINK-10333 
> 


flink sql syntax for accessing object

2019-08-23 Thread Fanbin Bu
Hi,

I have a table with schema being a scala case class or a Map. How do I
access the field?
Tried the following and it doesn't work.

case class MyObject(myField: String)
case class Event(myObject: MyObject, myMap: Map[String, String])
table = tableEnv.fromDataStream[Event](myStream, 'myObject, 'myMap)
tableEnv.scan(tableName).select("myObject.field, myMap['key']")

What is the right way to access those if possible?

Thanks,
Fanbin


OVER operator filtering out records

2019-08-23 Thread Vinod Mehra
We have a SQL based flink job which is consume a very low volume stream (1
or 2 events in few hours):






*SELECT user_id,COUNT(*) OVER (PARTITION BY user_id ORDER BY rowtime
RANGE INTERVAL '30' DAY PRECEDING) as count_30_days,
COALESCE(occurred_at, logged_at) AS latency_marker,rowtimeFROM
event_fooWHERE user_id IS NOT NULL*

The OVER operator seems to filter out events as per the flink dashboard
(records received =  records sent = 0)

The operator looks like this:

*over: (PARTITION BY: $1, ORDER BY: rowtime, RANGEBETWEEN 259200
PRECEDING AND CURRENT ROW, select: (rowtime, $1, $2, COUNT(*) AS w0$o0)) ->
select: ($1 AS user_id, w0$o0 AS count_30_days, $2 AS latency_marker,
rowtime) -> to: Tuple2 -> Filter -> group_counter_count_30d.1.sqlRecords ->
sample_without_formatter*

I know that the OVER operator can discard late arriving events, but these
events are not arriving late for sure. The watermark for all operators stay
at 0 because the output events is 0.

We have an exactly same SQL job against a high volume stream that is
working fine. Watermarks progress in timely manner and events are delivered
in timely manner as well.

Any idea what could be going wrong? Are the events getting buffered waiting
for certain number of events? If so, what is the threshold?

Thanks,
Vinod


kinesis table connector support

2019-08-23 Thread Fanbin Bu
Hi,

Looks like Flink table connectors do not include `kinesis`. (only
FileSystem, Kafka, ES) see
https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/connect.html#table-connectors
.
I also found some examples for Kafka:
https://eventador.io/blog/flink_table_and_sql_api_with_apache_flink_16/.
I'm wondering is there such a thing for kinesis also.

Is there any plan to support this in the future? Otherwise, what needs to
be done if we want to implement it on my own.

Basically, I have a kinesis stream that emits json string data and I would
like to use Flink Table/SQL api to to the streaming/batch processing.
Currently, I'm using DataStream API which is not as flexible.

Any help would be appreciated.

Thanks,
Fanbin


Use logback instead of log4j

2019-08-23 Thread Vishwas Siravara
Hi ,
>From the flink doc , in order to use logback instead of log4j " Users
willing to use logback instead of log4j can just exclude log4j (or delete
it from the lib/ folder)."
https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/logging.html
 .

However when i delete it from the lib and start the cluster , there are no
logs generated , instead I see console log which says "Failed to
instantiate SLF4J LoggerFactory"

Reported exception:
java.lang.NoClassDefFoundError: org/apache/log4j/Level
at org.slf4j.LoggerFactory.bind(LoggerFactory.java:143)
at org.slf4j.LoggerFactory.performInitialization(LoggerFactory.java:122)
at org.slf4j.LoggerFactory.getILoggerFactory(LoggerFactory.java:378)
at org.slf4j.LoggerFactory.getLogger(LoggerFactory.java:328)
at org.slf4j.LoggerFactory.getLogger(LoggerFactory.java:349)
at 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.(ClusterEntrypoint.java:98)


How can I use logback instead ?


Thanks,
Vishwas


Hive version in Flink

2019-08-23 Thread Yebgenya Lazarkhosrouabadi
Hello,

I'm using Flink on Cloudera-quickstart-vm-5.13 and need to access the 
Hive-Tables. The version of hive on Cloudera is 1.1.0 , but in order to access 
the data of the Hive-Tables, a higher version of hive is needed. Unfortunately 
it is not possible to easily change the version of Hive on Cloudera. I have 
installed Hive 2.1.0 on Cloudera , and want to specify it on Flink 10, so that 
Flink uses the new version of Hive and not the 1.1.0 one. How can I do that?

Regards
Yebgenya Lazar
HINWEIS: Dies ist eine vertrauliche Nachricht und nur f?r den Adressaten 
bestimmt. Es ist nicht erlaubt, diese Nachricht zu kopieren oder Dritten 
zug?nglich zu machen. Sollten Sie diese Nachricht irrt?mlich erhalten haben, 
bitte ich um Ihre Mitteilung per E-Mail oder unter der oben angegebenen 
Telefonnummer.


type error with generics ..

2019-08-23 Thread Debasish Ghosh
Hello -

I have the following call to addSource where I pass a Custom SourceFunction
..

env.addSource(
  new CollectionSourceFunctionJ(data, TypeInformation.of(new
TypeHint(){}))
)

where data is List and CollectionSourceFunctionJ is a Scala case
class ..

case class CollectionSourceFunctionJ[T](data: java.util.List[T], ti:
TypeInformation[T]) extends SourceFunction[T] {
  def cancel(): Unit = {}
  def run(ctx: SourceContext[T]): Unit = {
data.asScala.foreach(d ⇒ ctx.collect(d))
  }
}

When the following transformation runs ..

DataStream ins = readStream(in, Data.class, serdeData);
DataStream simples = ins.map((Data d) -> new
Simple(d.getName())).returns(new TypeHint(){}.getTypeInfo());

I get the following exception in the second line ..

org.apache.flink.api.common.functions.InvalidTypesException: The return
> type of function 'Custom Source' could not be determined automatically, due
> to type erasure. You can give type information hints by using the
> returns(...) method on the result of the transformation call, or by letting
> your function implement the 'ResultTypeQueryable' interface.


Initially the returns call was not there and I was getting the same
exception. Now after adding the returns call, nothing changes.

Any help will be appreciated ..

regards.

-- 
Debasish Ghosh
http://manning.com/ghosh2
http://manning.com/ghosh

Twttr: @debasishg
Blog: http://debasishg.blogspot.com
Code: http://github.com/debasishg


Re: [ANNOUNCE] Apache Flink 1.9.0 released

2019-08-23 Thread Oytun Tez
Hi all,

We also had to rollback our upgrade effort for 2 reasons:

- Official Docker container is not ready yet
- This artefact is not published with
scala: org.apache.flink:flink-queryable-state-client-java_2.11:jar:1.9.0






---
Oytun Tez

*M O T A W O R D*
The World's Fastest Human Translation Platform.
oy...@motaword.com — www.motaword.com


On Fri, Aug 23, 2019 at 10:48 AM Zili Chen  wrote:

> Hi Till,
>
> Did we mention this in release note(or maybe previous release note where
> we did the exclusion)?
>
> Best,
> tison.
>
>
> Till Rohrmann  于2019年8月23日周五 下午10:28写道:
>
>> Hi Gavin,
>>
>> if I'm not mistaken, then the community excluded the Scala FlinkShell
>> since a couple of versions for Scala 2.12. The problem seems to be that
>> some of the tests failed. See here [1] for more information.
>>
>> [1] https://issues.apache.org/jira/browse/FLINK-10911
>>
>> Cheers,
>> Till
>>
>> On Fri, Aug 23, 2019 at 11:44 AM Gavin Lee  wrote:
>>
>>> I used package on apache official site, with mirror [1], the difference
>>> is
>>> I used scala 2.12 version.
>>> I also tried to build from source for both scala 2.11 and 2.12, when
>>> build
>>> 2.12 the FlinkShell.class is in flink-dist jar file but after running mvn
>>> clean package -Dscala-2.12, this class was removed in
>>> flink-dist_2.12-1.9 jar
>>> file.
>>> Seems broken here for scala 2.12, right?
>>>
>>> [1]
>>>
>>> http://mirror.bit.edu.cn/apache/flink/flink-1.9.0/flink-1.9.0-bin-scala_2.12.tgz
>>>
>>> On Fri, Aug 23, 2019 at 4:37 PM Zili Chen  wrote:
>>>
>>> > I downloaded 1.9.0 dist from here[1] and didn't see the issue. Where
>>> do you
>>> > download it? Could you try to download the dist from [1] and see
>>> whether
>>> > the problem last?
>>> >
>>> > Best,
>>> > tison.
>>> >
>>> > [1]
>>> >
>>> http://mirrors.tuna.tsinghua.edu.cn/apache/flink/flink-1.9.0/flink-1.9.0-bin-scala_2.11.tgz
>>> >
>>> >
>>> > Gavin Lee  于2019年8月23日周五 下午4:34写道:
>>> >
>>> >> Thanks for your reply @Zili.
>>> >> I'm afraid it's not the same issue.
>>> >> I found that the FlinkShell.class was not included in flink dist jar
>>> file
>>> >> in 1.9.0 version.
>>> >> Nowhere can find this class file inside jar, either in opt or lib
>>> >> directory under root folder of flink distribution.
>>> >>
>>> >>
>>> >> On Fri, Aug 23, 2019 at 4:10 PM Zili Chen 
>>> wrote:
>>> >>
>>> >>> Hi Gavin,
>>> >>>
>>> >>> I also find a problem in shell if the directory contain whitespace
>>> >>> then the final command to run is incorrect. Could you check the
>>> >>> final command to be executed?
>>> >>>
>>> >>> FYI, here is the ticket[1].
>>> >>>
>>> >>> Best,
>>> >>> tison.
>>> >>>
>>> >>> [1] https://issues.apache.org/jira/browse/FLINK-13827
>>> >>>
>>> >>>
>>> >>> Gavin Lee  于2019年8月23日周五 下午3:36写道:
>>> >>>
>>>  Why bin/start-scala-shell.sh local return following error?
>>> 
>>>  bin/start-scala-shell.sh local
>>> 
>>>  Error: Could not find or load main class
>>>  org.apache.flink.api.scala.FlinkShell
>>>  For flink 1.8.1 and previous ones, no such issues.
>>> 
>>>  On Fri, Aug 23, 2019 at 2:05 PM qi luo  wrote:
>>> 
>>> > Congratulations and thanks for the hard work!
>>> >
>>> > Qi
>>> >
>>> > On Aug 22, 2019, at 8:03 PM, Tzu-Li (Gordon) Tai <
>>> tzuli...@apache.org>
>>> > wrote:
>>> >
>>> > The Apache Flink community is very happy to announce the release of
>>> > Apache Flink 1.9.0, which is the latest major release.
>>> >
>>> > Apache Flink® is an open-source stream processing framework for
>>> > distributed, high-performing, always-available, and accurate data
>>> streaming
>>> > applications.
>>> >
>>> > The release is available for download at:
>>> > https://flink.apache.org/downloads.html
>>> >
>>> > Please check out the release blog post for an overview of the
>>> > improvements for this new major release:
>>> > https://flink.apache.org/news/2019/08/22/release-1.9.0.html
>>> >
>>> > The full release notes are available in Jira:
>>> >
>>> >
>>> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522&version=12344601
>>> >
>>> > We would like to thank all contributors of the Apache Flink
>>> community
>>> > who made this release possible!
>>> >
>>> > Cheers,
>>> > Gordon
>>> >
>>> >
>>> >
>>> 
>>>  --
>>>  Gavin
>>> 
>>> >>>
>>> >>
>>> >> --
>>> >> Gavin
>>> >>
>>> >
>>>
>>> --
>>> Gavin
>>>
>>


Re: [ANNOUNCE] Apache Flink 1.9.0 released

2019-08-23 Thread Zili Chen
Hi Till,

Did we mention this in release note(or maybe previous release note where we
did the exclusion)?

Best,
tison.


Till Rohrmann  于2019年8月23日周五 下午10:28写道:

> Hi Gavin,
>
> if I'm not mistaken, then the community excluded the Scala FlinkShell
> since a couple of versions for Scala 2.12. The problem seems to be that
> some of the tests failed. See here [1] for more information.
>
> [1] https://issues.apache.org/jira/browse/FLINK-10911
>
> Cheers,
> Till
>
> On Fri, Aug 23, 2019 at 11:44 AM Gavin Lee  wrote:
>
>> I used package on apache official site, with mirror [1], the difference is
>> I used scala 2.12 version.
>> I also tried to build from source for both scala 2.11 and 2.12, when build
>> 2.12 the FlinkShell.class is in flink-dist jar file but after running mvn
>> clean package -Dscala-2.12, this class was removed in flink-dist_2.12-1.9
>> jar
>> file.
>> Seems broken here for scala 2.12, right?
>>
>> [1]
>>
>> http://mirror.bit.edu.cn/apache/flink/flink-1.9.0/flink-1.9.0-bin-scala_2.12.tgz
>>
>> On Fri, Aug 23, 2019 at 4:37 PM Zili Chen  wrote:
>>
>> > I downloaded 1.9.0 dist from here[1] and didn't see the issue. Where do
>> you
>> > download it? Could you try to download the dist from [1] and see whether
>> > the problem last?
>> >
>> > Best,
>> > tison.
>> >
>> > [1]
>> >
>> http://mirrors.tuna.tsinghua.edu.cn/apache/flink/flink-1.9.0/flink-1.9.0-bin-scala_2.11.tgz
>> >
>> >
>> > Gavin Lee  于2019年8月23日周五 下午4:34写道:
>> >
>> >> Thanks for your reply @Zili.
>> >> I'm afraid it's not the same issue.
>> >> I found that the FlinkShell.class was not included in flink dist jar
>> file
>> >> in 1.9.0 version.
>> >> Nowhere can find this class file inside jar, either in opt or lib
>> >> directory under root folder of flink distribution.
>> >>
>> >>
>> >> On Fri, Aug 23, 2019 at 4:10 PM Zili Chen 
>> wrote:
>> >>
>> >>> Hi Gavin,
>> >>>
>> >>> I also find a problem in shell if the directory contain whitespace
>> >>> then the final command to run is incorrect. Could you check the
>> >>> final command to be executed?
>> >>>
>> >>> FYI, here is the ticket[1].
>> >>>
>> >>> Best,
>> >>> tison.
>> >>>
>> >>> [1] https://issues.apache.org/jira/browse/FLINK-13827
>> >>>
>> >>>
>> >>> Gavin Lee  于2019年8月23日周五 下午3:36写道:
>> >>>
>>  Why bin/start-scala-shell.sh local return following error?
>> 
>>  bin/start-scala-shell.sh local
>> 
>>  Error: Could not find or load main class
>>  org.apache.flink.api.scala.FlinkShell
>>  For flink 1.8.1 and previous ones, no such issues.
>> 
>>  On Fri, Aug 23, 2019 at 2:05 PM qi luo  wrote:
>> 
>> > Congratulations and thanks for the hard work!
>> >
>> > Qi
>> >
>> > On Aug 22, 2019, at 8:03 PM, Tzu-Li (Gordon) Tai <
>> tzuli...@apache.org>
>> > wrote:
>> >
>> > The Apache Flink community is very happy to announce the release of
>> > Apache Flink 1.9.0, which is the latest major release.
>> >
>> > Apache Flink® is an open-source stream processing framework for
>> > distributed, high-performing, always-available, and accurate data
>> streaming
>> > applications.
>> >
>> > The release is available for download at:
>> > https://flink.apache.org/downloads.html
>> >
>> > Please check out the release blog post for an overview of the
>> > improvements for this new major release:
>> > https://flink.apache.org/news/2019/08/22/release-1.9.0.html
>> >
>> > The full release notes are available in Jira:
>> >
>> >
>> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522&version=12344601
>> >
>> > We would like to thank all contributors of the Apache Flink
>> community
>> > who made this release possible!
>> >
>> > Cheers,
>> > Gordon
>> >
>> >
>> >
>> 
>>  --
>>  Gavin
>> 
>> >>>
>> >>
>> >> --
>> >> Gavin
>> >>
>> >
>>
>> --
>> Gavin
>>
>


Re: [ANNOUNCE] Apache Flink 1.9.0 released

2019-08-23 Thread Till Rohrmann
Hi Gavin,

if I'm not mistaken, then the community excluded the Scala FlinkShell since
a couple of versions for Scala 2.12. The problem seems to be that some of
the tests failed. See here [1] for more information.

[1] https://issues.apache.org/jira/browse/FLINK-10911

Cheers,
Till

On Fri, Aug 23, 2019 at 11:44 AM Gavin Lee  wrote:

> I used package on apache official site, with mirror [1], the difference is
> I used scala 2.12 version.
> I also tried to build from source for both scala 2.11 and 2.12, when build
> 2.12 the FlinkShell.class is in flink-dist jar file but after running mvn
> clean package -Dscala-2.12, this class was removed in flink-dist_2.12-1.9
> jar
> file.
> Seems broken here for scala 2.12, right?
>
> [1]
>
> http://mirror.bit.edu.cn/apache/flink/flink-1.9.0/flink-1.9.0-bin-scala_2.12.tgz
>
> On Fri, Aug 23, 2019 at 4:37 PM Zili Chen  wrote:
>
> > I downloaded 1.9.0 dist from here[1] and didn't see the issue. Where do
> you
> > download it? Could you try to download the dist from [1] and see whether
> > the problem last?
> >
> > Best,
> > tison.
> >
> > [1]
> >
> http://mirrors.tuna.tsinghua.edu.cn/apache/flink/flink-1.9.0/flink-1.9.0-bin-scala_2.11.tgz
> >
> >
> > Gavin Lee  于2019年8月23日周五 下午4:34写道:
> >
> >> Thanks for your reply @Zili.
> >> I'm afraid it's not the same issue.
> >> I found that the FlinkShell.class was not included in flink dist jar
> file
> >> in 1.9.0 version.
> >> Nowhere can find this class file inside jar, either in opt or lib
> >> directory under root folder of flink distribution.
> >>
> >>
> >> On Fri, Aug 23, 2019 at 4:10 PM Zili Chen  wrote:
> >>
> >>> Hi Gavin,
> >>>
> >>> I also find a problem in shell if the directory contain whitespace
> >>> then the final command to run is incorrect. Could you check the
> >>> final command to be executed?
> >>>
> >>> FYI, here is the ticket[1].
> >>>
> >>> Best,
> >>> tison.
> >>>
> >>> [1] https://issues.apache.org/jira/browse/FLINK-13827
> >>>
> >>>
> >>> Gavin Lee  于2019年8月23日周五 下午3:36写道:
> >>>
>  Why bin/start-scala-shell.sh local return following error?
> 
>  bin/start-scala-shell.sh local
> 
>  Error: Could not find or load main class
>  org.apache.flink.api.scala.FlinkShell
>  For flink 1.8.1 and previous ones, no such issues.
> 
>  On Fri, Aug 23, 2019 at 2:05 PM qi luo  wrote:
> 
> > Congratulations and thanks for the hard work!
> >
> > Qi
> >
> > On Aug 22, 2019, at 8:03 PM, Tzu-Li (Gordon) Tai <
> tzuli...@apache.org>
> > wrote:
> >
> > The Apache Flink community is very happy to announce the release of
> > Apache Flink 1.9.0, which is the latest major release.
> >
> > Apache Flink® is an open-source stream processing framework for
> > distributed, high-performing, always-available, and accurate data
> streaming
> > applications.
> >
> > The release is available for download at:
> > https://flink.apache.org/downloads.html
> >
> > Please check out the release blog post for an overview of the
> > improvements for this new major release:
> > https://flink.apache.org/news/2019/08/22/release-1.9.0.html
> >
> > The full release notes are available in Jira:
> >
> >
> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522&version=12344601
> >
> > We would like to thank all contributors of the Apache Flink community
> > who made this release possible!
> >
> > Cheers,
> > Gordon
> >
> >
> >
> 
>  --
>  Gavin
> 
> >>>
> >>
> >> --
> >> Gavin
> >>
> >
>
> --
> Gavin
>


Re: Problem with Flink on Yarn

2019-08-23 Thread Zhu Zhu
Hi Juan,

Have you tried Flink release built with Hadoop 2.7 or later version?
If you are using Flink 1.8/1.9, it should be Pre-bundled Hadoop 2.7+ jar
which can be found in the Flink download page.

I think YARN-3103 is about AMRMClientImp.class and it is in the flink
shaded hadoop jar.

Thanks,
Zhu Zhu

Juan Gentile  于2019年8月23日周五 下午7:48写道:

> Hello!
>
>
>
> We are running Flink on Yarn and we are currently getting the following
> error:
>
>
>
> *2019-08-23 06:11:01,534 WARN
> org.apache.hadoop.security.UserGroupInformation   -
> PriviledgedActionException as: (auth:KERBEROS)
> cause:org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.security.token.SecretManager$InvalidToken):
> Invalid AMRMToken from appattempt_1564713228886_5299648_01*
>
> *2019-08-23 06:11:01,535 WARN
> org.apache.hadoop.ipc.Client  - Exception
> encountered while connecting to the server :
> org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.security.token.SecretManager$InvalidToken):
> Invalid AMRMToken from appattempt_1564713228886_5299648_01*
>
> *2019-08-23 06:11:01,536 WARN
> org.apache.hadoop.security.UserGroupInformation   -
> PriviledgedActionException as:  (auth:KERBEROS)
> cause:org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.security.token.SecretManager$InvalidToken):
> Invalid AMRMToken from appattempt_1564713228886_5299648_01*
>
> *2019-08-23 06:11:01,581 WARN
> org.apache.hadoop.io.retry.RetryInvocationHandler - Exception
> while invoking ApplicationMasterProtocolPBClientImpl.allocate over rm0. Not
> retrying because Invalid or Cancelled Token*
>
> *org.apache.hadoop.security.token.SecretManager$InvalidToken: Invalid
> AMRMToken from appattempt_1564713228886_5299648_01*
>
> *at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native
> Method)*
>
> *at
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)*
>
> *at
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)*
>
> *at java.lang.reflect.Constructor.newInstance(Constructor.java:423)*
>
> *at
> org.apache.hadoop.yarn.ipc.RPCUtil.instantiateException(RPCUtil.java:53)*
>
> *at
> org.apache.hadoop.yarn.ipc.RPCUtil.unwrapAndThrowException(RPCUtil.java:104)*
>
> *at
> org.apache.hadoop.yarn.api.impl.pb.client.ApplicationMasterProtocolPBClientImpl.allocate(ApplicationMasterProtocolPBClientImpl.java:79)*
>
> *at sun.reflect.GeneratedMethodAccessor37.invoke(Unknown Source)*
>
> *at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)*
>
> *at java.lang.reflect.Method.invoke(Method.java:498)*
>
> *at
> org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:288)*
>
> *at
> org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:206)*
>
> *at
> org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:188)*
>
> *at com.sun.proxy.$Proxy26.allocate(Unknown Source)*
>
> *at
> org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl.allocate(AMRMClientImpl.java:277)*
>
> *at
> org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl$HeartbeatThread.run(AMRMClientAsyncImpl.java:224)*
>
> *Caused by:
> org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.security.token.SecretManager$InvalidToken):
> Invalid AMRMToken from appattempt_1564713228886_5299648_01*
>
> *at org.apache.hadoop.ipc.Client.call(Client.java:1472)*
>
> *at org.apache.hadoop.ipc.Client.call(Client.java:1409)*
>
> *at
> org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:231)*
>
> *at com.sun.proxy.$Proxy25.allocate(Unknown Source)*
>
> *at
> org.apache.hadoop.yarn.api.impl.pb.client.ApplicationMasterProtocolPBClientImpl.allocate(ApplicationMasterProtocolPBClientImpl.java:77)*
>
> *... 9 more*
>
>
>
> The Flink cluster runs ok for a while but then after a day we get this
> error again. We haven’t made changes to our code so that’s why it’s hard to
> understand why all of a sudden we started to see this.
>
>
>
> We found this issue reported on Yarn
> https://issues.apache.org/jira/browse/YARN-3103 but our version of Yarn
> already has that fix.
>
>
>
> Any help will be appreciated.
>
>
>
> Thank you,
>
> Juan
>


Are there any news on custom trigger support for SQL/Table API?

2019-08-23 Thread Theo Diefenthal
Hi there, 

I currently evaluate to let our experienced system users write Flink-SQL 
queries directly. Currently, all queries our users need are implemented 
programmatically. 

There is one major problem preventing us from just giving SQL to our users 
directly. Almost all queries of our users are "threshold-based". They are 
something like "Notify me directly if there were (>=)10 machine restarts 
today". So the time window is one day, but if there are 10 outages at 10:00 
already, we must trigger the window. 

In our programmed pipelines, that's rather easy to accomplish. We just build 
Count-Triggers. In Flink-SQL, this seems to not be possible at all. See [1]. 

That mailing list entry references a word draft document of implementations 
talking about a SQL syntax enrichments (With EMIT clause) but as a pre-step, it 
seems to be planned that Flink could provide trigger customization via 
QueryConfig: 

" Flink’s Table API / SQL features a QueryConfig object to configure the 
execution of a streaming query. So far (Flink 1.3), it only allows to specify a 
state retention period but we plan to extend it with trigger policies. This way 
we do not have to extend SQL itself to specify triggering policies. However, an 
EMIT statement could of course override the configuration of a QueryConfig. So 
from our point of view it would be an optional alternative. " 

That mailing list article is >1 year old and that word doc references to Flink 
1.3. I checked out QueryConfig in my Flink 1.8.0 and it sadly seems to still 
not support this. So my question is: Is there any timeline / roadmap / JIRA 
issue to track this and is this a still planned feature (in 
near/mid/long-term?) Just being able to change the trigger via QueryConfig 
would already be really great for us, and having this EMIT clause would of 
course be awesome. 

Best regards 
Theo 

[1] 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/SQL-Do-Not-Support-Custom-Trigger-td20932.html
 


Re: I'm not able to make a stream-stream Time windows JOIN in Flink SQL

2019-08-23 Thread Theo Diefenthal
Hi Fabian, Hi Zhenghua 

Thank you for your suggestions and telling me that I was on the right track. 
And good to know how to find out whether something yields to time-bounded or 
regular join. 

@Fabian: Regarding your suggested first option: Isn't that exactly what my 
first try was? With this TUMBLE_START... That sadly didn't work due to " 
Rowtime attributes must not be in the input rows of a regular join ". But I'll 
give option 2 a try by just adding another attribute. 

And some addition: Regarding my second try: I wrote that the reduced query 
didn't produce any data, but that was indeed my mistake. I fiddled around too 
much with my data so that I manipulated the original data in a way that the 
query couldn't output a result any more when testing all of those combinations. 
Now the second attempt works but isn't really what I wanted to query (as the 
"same day"-predicate is still missing). 

Best regards 
Theo 


Von: "Fabian Hueske"  
An: "Zhenghua Gao"  
CC: "Theo Diefenthal" , "user" 
 
Gesendet: Freitag, 16. August 2019 10:05:45 
Betreff: Re: I'm not able to make a stream-stream Time windows JOIN in Flink 
SQL 

Hi Theo, 

The main problem is that the semantics of your join (Join all events that 
happened on the same day) are not well-supported by Flink yet. 

In terms of true streaming joins, Flink supports the time-windowed join (with 
the BETWEEN predicate) and the time-versioned table join (which does not apply 
here). 
The first does not really fit because it puts the windows "around the event", 
i.e., if you have an event at 12:35 and a window of 10 mins earlier and 15 mins 
later, it will join with events between 12:25 and 12:50. 
An other limitation of Flink is that you cannot modify event-time attributes 
(well you can, but they lose their event-time property and become regular 
TIMESTAMP attributes). 
This limitation exists, because we must ensure that the attributes are still 
aligned with watermarks after they were modified (or adjusting the watermarks 
accordingly). 
Since analyzing expressions that modify timestamps to figure out whether they 
preserve watermark alignment is very difficult, we opted to always remove 
event-time property when an event-time attribute is modified. 

I see two options for your use case: 

1) use the join that you described before with the -24 and +24 hour window and 
apply more fine-grained predicates to filter out the join results that you 
don't need. 
2) add an additional time attribute to your input that is a rounded down 
version of the timestamp (rounded to 24h), declare the rounded timestamp as 
your event-time attribute, and join with an equality predicate on the rounded 
timestamp. 

Best, Fabian 

Am Di., 13. Aug. 2019 um 13:41 Uhr schrieb Zhenghua Gao < [ 
mailto:doc...@gmail.com | doc...@gmail.com ] >: 



I wrote a demo example for time windowed join which you can pick up [1] 
[1] [ https://gist.github.com/docete/8e78ff8b5d0df69f60dda547780101f1 | 
https://gist.github.com/docete/8e78ff8b5d0df69f60dda547780101f1 ] 

Best Regards, 
Zhenghua Gao 


On Tue, Aug 13, 2019 at 4:13 PM Zhenghua Gao < [ mailto:doc...@gmail.com | 
doc...@gmail.com ] > wrote: 

BQ_BEGIN

You can check the plan after optimize to verify it's a regular join or 
time-bounded join(Should have a WindowJoin). The most direct way is breakpoint 
at optimizing phase [1][2]. 
And you can use your TestData and create an ITCase for debugging [3] 


[1] [ 
https://github.com/apache/flink/blob/master/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala#L148
 | 
https://github.com/apache/flink/blob/master/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala#L148
 ] 
[2] [ 
https://github.com/apache/flink/blob/master/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/StreamOptimizer.scala#L68
 | 
https://github.com/apache/flink/blob/master/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/StreamOptimizer.scala#L68
 ] 
[3] [ 
https://github.com/apache/flink/blob/master/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/WindowJoinITCase.scala
 | 
https://github.com/apache/flink/blob/master/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/WindowJoinITCase.scala
 ] 

Best Regards, 
Zhenghua Gao 


On Mon, Aug 12, 2019 at 10:49 PM Theo Diefenthal < [ 
mailto:theo.diefent...@scoop-software.de | theo.diefent...@scoop-software.de ] 
> wrote: 

BQ_BEGIN

Hi there, 

Currently, I'm trying to write a SQL query which shall executed a time 
windowed/bounded JOIN on two data streams. 

Suppose I have stream1 with attribute id, ts, user and stream2 with attribute 
id, ts, userName. I want to receive the natural JOIN of both streams with 
events of the same day. 

In Oracle (With a ts column as number instead of Timestamp, for

Problem with Flink on Yarn

2019-08-23 Thread Juan Gentile
Hello!

We are running Flink on Yarn and we are currently getting the following error:

2019-08-23 06:11:01,534 WARN  org.apache.hadoop.security.UserGroupInformation   
- PriviledgedActionException as: (auth:KERBEROS) 
cause:org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.security.token.SecretManager$InvalidToken):
 Invalid AMRMToken from appattempt_1564713228886_5299648_01
2019-08-23 06:11:01,535 WARN  org.apache.hadoop.ipc.Client  
- Exception encountered while connecting to the server : 
org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.security.token.SecretManager$InvalidToken):
 Invalid AMRMToken from appattempt_1564713228886_5299648_01
2019-08-23 06:11:01,536 WARN  org.apache.hadoop.security.UserGroupInformation   
- PriviledgedActionException as:  (auth:KERBEROS) 
cause:org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.security.token.SecretManager$InvalidToken):
 Invalid AMRMToken from appattempt_1564713228886_5299648_01
2019-08-23 06:11:01,581 WARN  org.apache.hadoop.io.retry.RetryInvocationHandler 
- Exception while invoking 
ApplicationMasterProtocolPBClientImpl.allocate over rm0. Not retrying because 
Invalid or Cancelled Token
org.apache.hadoop.security.token.SecretManager$InvalidToken: Invalid AMRMToken 
from appattempt_1564713228886_5299648_01
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at 
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at 
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at org.apache.hadoop.yarn.ipc.RPCUtil.instantiateException(RPCUtil.java:53)
at 
org.apache.hadoop.yarn.ipc.RPCUtil.unwrapAndThrowException(RPCUtil.java:104)
at 
org.apache.hadoop.yarn.api.impl.pb.client.ApplicationMasterProtocolPBClientImpl.allocate(ApplicationMasterProtocolPBClientImpl.java:79)
at sun.reflect.GeneratedMethodAccessor37.invoke(Unknown Source)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:288)
at 
org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:206)
at 
org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:188)
at com.sun.proxy.$Proxy26.allocate(Unknown Source)
at 
org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl.allocate(AMRMClientImpl.java:277)
at 
org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl$HeartbeatThread.run(AMRMClientAsyncImpl.java:224)
Caused by: 
org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.security.token.SecretManager$InvalidToken):
 Invalid AMRMToken from appattempt_1564713228886_5299648_01
at org.apache.hadoop.ipc.Client.call(Client.java:1472)
at org.apache.hadoop.ipc.Client.call(Client.java:1409)
at 
org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:231)
at com.sun.proxy.$Proxy25.allocate(Unknown Source)
at 
org.apache.hadoop.yarn.api.impl.pb.client.ApplicationMasterProtocolPBClientImpl.allocate(ApplicationMasterProtocolPBClientImpl.java:77)
... 9 more

The Flink cluster runs ok for a while but then after a day we get this error 
again. We haven’t made changes to our code so that’s why it’s hard to 
understand why all of a sudden we started to see this.

We found this issue reported on Yarn 
https://issues.apache.org/jira/browse/YARN-3103 but our version of Yarn already 
has that fix.

Any help will be appreciated.

Thank you,
Juan


Re: [ANNOUNCE] Apache Flink 1.9.0 released

2019-08-23 Thread Gavin Lee
I used package on apache official site, with mirror [1], the difference is
I used scala 2.12 version.
I also tried to build from source for both scala 2.11 and 2.12, when build
2.12 the FlinkShell.class is in flink-dist jar file but after running mvn
clean package -Dscala-2.12, this class was removed in flink-dist_2.12-1.9 jar
file.
Seems broken here for scala 2.12, right?

[1]
http://mirror.bit.edu.cn/apache/flink/flink-1.9.0/flink-1.9.0-bin-scala_2.12.tgz

On Fri, Aug 23, 2019 at 4:37 PM Zili Chen  wrote:

> I downloaded 1.9.0 dist from here[1] and didn't see the issue. Where do you
> download it? Could you try to download the dist from [1] and see whether
> the problem last?
>
> Best,
> tison.
>
> [1]
> http://mirrors.tuna.tsinghua.edu.cn/apache/flink/flink-1.9.0/flink-1.9.0-bin-scala_2.11.tgz
>
>
> Gavin Lee  于2019年8月23日周五 下午4:34写道:
>
>> Thanks for your reply @Zili.
>> I'm afraid it's not the same issue.
>> I found that the FlinkShell.class was not included in flink dist jar file
>> in 1.9.0 version.
>> Nowhere can find this class file inside jar, either in opt or lib
>> directory under root folder of flink distribution.
>>
>>
>> On Fri, Aug 23, 2019 at 4:10 PM Zili Chen  wrote:
>>
>>> Hi Gavin,
>>>
>>> I also find a problem in shell if the directory contain whitespace
>>> then the final command to run is incorrect. Could you check the
>>> final command to be executed?
>>>
>>> FYI, here is the ticket[1].
>>>
>>> Best,
>>> tison.
>>>
>>> [1] https://issues.apache.org/jira/browse/FLINK-13827
>>>
>>>
>>> Gavin Lee  于2019年8月23日周五 下午3:36写道:
>>>
 Why bin/start-scala-shell.sh local return following error?

 bin/start-scala-shell.sh local

 Error: Could not find or load main class
 org.apache.flink.api.scala.FlinkShell
 For flink 1.8.1 and previous ones, no such issues.

 On Fri, Aug 23, 2019 at 2:05 PM qi luo  wrote:

> Congratulations and thanks for the hard work!
>
> Qi
>
> On Aug 22, 2019, at 8:03 PM, Tzu-Li (Gordon) Tai 
> wrote:
>
> The Apache Flink community is very happy to announce the release of
> Apache Flink 1.9.0, which is the latest major release.
>
> Apache Flink® is an open-source stream processing framework for
> distributed, high-performing, always-available, and accurate data 
> streaming
> applications.
>
> The release is available for download at:
> https://flink.apache.org/downloads.html
>
> Please check out the release blog post for an overview of the
> improvements for this new major release:
> https://flink.apache.org/news/2019/08/22/release-1.9.0.html
>
> The full release notes are available in Jira:
>
> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522&version=12344601
>
> We would like to thank all contributors of the Apache Flink community
> who made this release possible!
>
> Cheers,
> Gordon
>
>
>

 --
 Gavin

>>>
>>
>> --
>> Gavin
>>
>

-- 
Gavin


Per Partition Watermarking source idleness

2019-08-23 Thread Prakhar Mathur
Hi,

We are using flink v1.6. We are facing data loss issues while consuming
data from older offsets in Kafka with windowing. We are exploring per
partition watermarking strategy. But we noticed that when we are trying to
consume from multiple topics and if any of the partition is not receiving
data it just blocks everything. Do we have a known solution for this?


Re: [ANNOUNCE] Apache Flink 1.9.0 released

2019-08-23 Thread Zili Chen
I downloaded 1.9.0 dist from here[1] and didn't see the issue. Where do you
download it? Could you try to download the dist from [1] and see whether
the problem last?

Best,
tison.

[1]
http://mirrors.tuna.tsinghua.edu.cn/apache/flink/flink-1.9.0/flink-1.9.0-bin-scala_2.11.tgz


Gavin Lee  于2019年8月23日周五 下午4:34写道:

> Thanks for your reply @Zili.
> I'm afraid it's not the same issue.
> I found that the FlinkShell.class was not included in flink dist jar file
> in 1.9.0 version.
> Nowhere can find this class file inside jar, either in opt or lib
> directory under root folder of flink distribution.
>
>
> On Fri, Aug 23, 2019 at 4:10 PM Zili Chen  wrote:
>
>> Hi Gavin,
>>
>> I also find a problem in shell if the directory contain whitespace
>> then the final command to run is incorrect. Could you check the
>> final command to be executed?
>>
>> FYI, here is the ticket[1].
>>
>> Best,
>> tison.
>>
>> [1] https://issues.apache.org/jira/browse/FLINK-13827
>>
>>
>> Gavin Lee  于2019年8月23日周五 下午3:36写道:
>>
>>> Why bin/start-scala-shell.sh local return following error?
>>>
>>> bin/start-scala-shell.sh local
>>>
>>> Error: Could not find or load main class
>>> org.apache.flink.api.scala.FlinkShell
>>> For flink 1.8.1 and previous ones, no such issues.
>>>
>>> On Fri, Aug 23, 2019 at 2:05 PM qi luo  wrote:
>>>
 Congratulations and thanks for the hard work!

 Qi

 On Aug 22, 2019, at 8:03 PM, Tzu-Li (Gordon) Tai 
 wrote:

 The Apache Flink community is very happy to announce the release of
 Apache Flink 1.9.0, which is the latest major release.

 Apache Flink® is an open-source stream processing framework for
 distributed, high-performing, always-available, and accurate data streaming
 applications.

 The release is available for download at:
 https://flink.apache.org/downloads.html

 Please check out the release blog post for an overview of the
 improvements for this new major release:
 https://flink.apache.org/news/2019/08/22/release-1.9.0.html

 The full release notes are available in Jira:

 https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522&version=12344601

 We would like to thank all contributors of the Apache Flink community
 who made this release possible!

 Cheers,
 Gordon



>>>
>>> --
>>> Gavin
>>>
>>
>
> --
> Gavin
>


Re: [ANNOUNCE] Apache Flink 1.9.0 released

2019-08-23 Thread Gavin Lee
Thanks for your reply @Zili.
I'm afraid it's not the same issue.
I found that the FlinkShell.class was not included in flink dist jar file
in 1.9.0 version.
Nowhere can find this class file inside jar, either in opt or lib directory
under root folder of flink distribution.


On Fri, Aug 23, 2019 at 4:10 PM Zili Chen  wrote:

> Hi Gavin,
>
> I also find a problem in shell if the directory contain whitespace
> then the final command to run is incorrect. Could you check the
> final command to be executed?
>
> FYI, here is the ticket[1].
>
> Best,
> tison.
>
> [1] https://issues.apache.org/jira/browse/FLINK-13827
>
>
> Gavin Lee  于2019年8月23日周五 下午3:36写道:
>
>> Why bin/start-scala-shell.sh local return following error?
>>
>> bin/start-scala-shell.sh local
>>
>> Error: Could not find or load main class
>> org.apache.flink.api.scala.FlinkShell
>> For flink 1.8.1 and previous ones, no such issues.
>>
>> On Fri, Aug 23, 2019 at 2:05 PM qi luo  wrote:
>>
>>> Congratulations and thanks for the hard work!
>>>
>>> Qi
>>>
>>> On Aug 22, 2019, at 8:03 PM, Tzu-Li (Gordon) Tai 
>>> wrote:
>>>
>>> The Apache Flink community is very happy to announce the release of
>>> Apache Flink 1.9.0, which is the latest major release.
>>>
>>> Apache Flink® is an open-source stream processing framework for
>>> distributed, high-performing, always-available, and accurate data streaming
>>> applications.
>>>
>>> The release is available for download at:
>>> https://flink.apache.org/downloads.html
>>>
>>> Please check out the release blog post for an overview of the
>>> improvements for this new major release:
>>> https://flink.apache.org/news/2019/08/22/release-1.9.0.html
>>>
>>> The full release notes are available in Jira:
>>>
>>> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522&version=12344601
>>>
>>> We would like to thank all contributors of the Apache Flink community
>>> who made this release possible!
>>>
>>> Cheers,
>>> Gordon
>>>
>>>
>>>
>>
>> --
>> Gavin
>>
>

-- 
Gavin


Re: [ANNOUNCE] Apache Flink 1.9.0 released

2019-08-23 Thread Zili Chen
Hi Gavin,

I also find a problem in shell if the directory contain whitespace
then the final command to run is incorrect. Could you check the
final command to be executed?

FYI, here is the ticket[1].

Best,
tison.

[1] https://issues.apache.org/jira/browse/FLINK-13827


Gavin Lee  于2019年8月23日周五 下午3:36写道:

> Why bin/start-scala-shell.sh local return following error?
>
> bin/start-scala-shell.sh local
>
> Error: Could not find or load main class
> org.apache.flink.api.scala.FlinkShell
> For flink 1.8.1 and previous ones, no such issues.
>
> On Fri, Aug 23, 2019 at 2:05 PM qi luo  wrote:
>
>> Congratulations and thanks for the hard work!
>>
>> Qi
>>
>> On Aug 22, 2019, at 8:03 PM, Tzu-Li (Gordon) Tai 
>> wrote:
>>
>> The Apache Flink community is very happy to announce the release of
>> Apache Flink 1.9.0, which is the latest major release.
>>
>> Apache Flink® is an open-source stream processing framework for
>> distributed, high-performing, always-available, and accurate data streaming
>> applications.
>>
>> The release is available for download at:
>> https://flink.apache.org/downloads.html
>>
>> Please check out the release blog post for an overview of the
>> improvements for this new major release:
>> https://flink.apache.org/news/2019/08/22/release-1.9.0.html
>>
>> The full release notes are available in Jira:
>>
>> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522&version=12344601
>>
>> We would like to thank all contributors of the Apache Flink community who
>> made this release possible!
>>
>> Cheers,
>> Gordon
>>
>>
>>
>
> --
> Gavin
>


Re: Can I use watermarkers to have a global trigger of different ProcessFunction's?

2019-08-23 Thread David Anderson
If you want to use event time processing with in-order data, then you
can use an AscendingTimestampExtractor [1].

David

[1] 
https://ci.apache.org/projects/flink/flink-docs-stable/dev/event_timestamp_extractors.html#assigners-with-ascending-timestamps

On Thu, Aug 22, 2019 at 4:03 PM Felipe Gutierrez
 wrote:
>
> thanks for the detail explanation! I removed my implementation of the 
> watermark which is not necessary in my case. I will only use Watermarkers if 
> I am dealing with out of order events.
>
> --
> -- Felipe Gutierrez
> -- skype: felipe.o.gutierrez
> -- https://felipeogutierrez.blogspot.com
>
>
> On Wed, Aug 21, 2019 at 9:09 PM David Anderson  wrote:
>>
>> What Watermarks do is to advance the event time clock. You can
>> consider a Watermark(t) as an assertion about the completeness of the
>> stream -- it marks a point in the stream and says that at that point,
>> the stream is (probably) now complete up to time t.
>>
>> The autoWatermarkInterval determines how often new Watermarks are
>> created -- or in other words, how often the event-time clock will be
>> able to move forward. From what you've presented, it seems like you
>> can leave this at its default, which is 200 msec. This means that five
>> times a second, as your application runs, each parallel instance will
>> create a new watermark (assuming there's been new data and that the
>> event time clock can be advanced).
>>
>> getCurrentWatermark() should NOT be implemented in terms of
>> System.currentTimeMillis -- you do not want your watermarking to
>> depend on the current processing time if you can possibly avoid it.
>> Part of the beauty of event time processing is being able to run your
>> application on historic data as well as live, real-time data, and this
>> is only possible if your watermarks depend on timestamps recorded in
>> the events, rather than System.currentTimeMillis.
>>
>> You should also try to decouple your watermarking strategy from the
>> specific processing you intend to later, downstream. The primary
>> concern you need to have when implementing the watermarking is to
>> consider how much out-of-orderness your data may have. A typical
>> timestamp assigner and watermark generator will look something like
>> this, assuming that your event stream will have its timestamps at most
>> 10 seconds out of order, and that your events have a timestamp field:
>>
>> DataStream withTimestampsAndWatermarks =
>> stream.assignTimestampsAndWatermarks(new
>> BoundedOutOfOrdernessTimestampExtractor(Time.seconds(10)) {
>>
>> @Override
>> public long extractTimestamp(MyEvent element) {
>> return element.timestamp;
>> }
>> });
>>
>> As for your specific application requirements, you might find it
>> simpler to rely on State Time-to-Live [1] rather than clearing state
>> yourself.
>>
>> There's no need to retain the state until the windowed join is
>> completed, since the operator executing the join can't access the
>> state in the CoProcessFunction. The CoProcessFunction should clear the
>> state whenever it is done with it; no other part of your job will
>> access it.
>>
>> If there is a risk that the CoProcessFunction will create state that
>> isn't freed, and you don't for some reason find State TTL a good
>> solution for this, then you can use either a processing time or event
>> time timer to trigger a call to onTimer in which you can free the
>> state. For example,
>>
>> timerService.registerEventTimeTimer(event.getEventTime() + 60 * 1000);
>>
>> registers an event time timer for 60 seconds after the timestamp in an
>> event -- meaning, take the event's timestamp, add 60 seconds, and wait
>> until the current Watermark has surpassed that point in time.
>>
>> The Flink training website has tutorials [2] and exercises [3] on these 
>> topics.
>>
>> [1] 
>> https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/stream/state/state.html#state-time-to-live-ttl
>> [2] https://training.ververica.com/lessons/event-time-watermarks.html
>> [3] 
>> https://training.ververica.com/exercises/rideEnrichment-processfunction.html
>>
>>
>> On Wed, Aug 21, 2019 at 10:59 AM Felipe Gutierrez
>>  wrote:
>> >
>> > Hi,
>> >
>> > I am a little confused about watermarkers in Flink.
>> >
>> > My application is using EventTime. My sources are calling 
>> > ctx.collectWithTimestamp and ctx.emitWatermark. Then I have a 
>> > CoProcessFunction which merge the two streams. I have a state on this 
>> > function and I want to clean this state every time that I trigger the 
>> > window of my next operator. The next operator is a join which is using a 
>> > window of 1 minute [1].
>> >
>> > stream01 = source01.connect(sideoutput02).keyBy().process(new 
>> > MyCoProcessFunction);
>> > stream02 = source02.connect(sideoutput01).keyBy().process(new 
>> > MyCoProcessFunction);
>> > stream01.join(stream02).window(60 sec).apply(new MyJoinFunction).print();
>> >
>> > I am confused if I have to use env.getConfig().setAutoW

Re: [ANNOUNCE] Apache Flink 1.9.0 released

2019-08-23 Thread Gavin Lee
Why bin/start-scala-shell.sh local return following error?

bin/start-scala-shell.sh local

Error: Could not find or load main class
org.apache.flink.api.scala.FlinkShell
For flink 1.8.1 and previous ones, no such issues.

On Fri, Aug 23, 2019 at 2:05 PM qi luo  wrote:

> Congratulations and thanks for the hard work!
>
> Qi
>
> On Aug 22, 2019, at 8:03 PM, Tzu-Li (Gordon) Tai 
> wrote:
>
> The Apache Flink community is very happy to announce the release of Apache
> Flink 1.9.0, which is the latest major release.
>
> Apache Flink® is an open-source stream processing framework for
> distributed, high-performing, always-available, and accurate data streaming
> applications.
>
> The release is available for download at:
> https://flink.apache.org/downloads.html
>
> Please check out the release blog post for an overview of the improvements
> for this new major release:
> https://flink.apache.org/news/2019/08/22/release-1.9.0.html
>
> The full release notes are available in Jira:
>
> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522&version=12344601
>
> We would like to thank all contributors of the Apache Flink community who
> made this release possible!
>
> Cheers,
> Gordon
>
>
>

-- 
Gavin


checkpoint failure suddenly even state size is into 10 mb around

2019-08-23 Thread Sushant Sawant
Hi all,
m facing two issues which I believe are co-related though.
1. Kafka source shows high back pressure.
2. Sudden checkpoint failure for entire day until restart.

My job does following thing,
a. Read from Kafka
b. Asyncio to external system
c. Dumping in Cassandra, Elasticsearch

Checkpointing is using file system.
This flink job is proven under high load,
around 5000/sec throughput.
But recently we scaled down parallelism since, there wasn't any load in
production and these issues started.

Please find the status shown by flink dashboard.
The github folder contains image where there was high back pressure and
checkpoint failure
https://github.com/sushantbprise/flink-dashboard/tree/master/failed-checkpointing
and  after restart, "everything is fine" images in this folder,
https://github.com/sushantbprise/flink-dashboard/tree/master/working-checkpointing

--
Could anyone point me towards direction what would have went wrong/ trouble
shooting??


Thanks & Regards,
Sushant Sawant