Re: Kafka table descriptor missing startFromTimestamp()

2019-12-19 Thread Dawid Wysakowicz
Hi,

The only reason why it was not exposed at the beginning is that not all
versions of the consumers support starting from a specific timestamp. I
think we could expose such setting now. Would you like to create an
issue for it?

Best,

Dawid

On 19/12/2019 06:56, Steve Whelan wrote:
> Examining the org.apache.flink.table.descriptors.Kafka class in Flink
> v1.8, it has the following startUpModes for consumers:
>     .startFromEarliest()
>     .startFromLatest()
>     .startFromSpecificOffsets(...)
>
> However, it does not have a method to support starting from a
> Timestamp. The FlinkKafkaConsumer supports this feature though. Was it
> a conscience decision to leave that start up mode out of the table
> descriptor? If so, what was the reasoning? 



signature.asc
Description: OpenPGP digital signature


Re: How to convert retract stream to dynamic table?

2019-12-19 Thread Dawid Wysakowicz
Hi,

Correct me if I am wrong James, but I think your original question was
how do you create a Table out of a changelog (a stream with a change
flag).  Unfortunately I think it is not possible right now. This
definitely is high on our priority list for the near future. There were
first approaches[1] to implement that before, but we must clarify all
aspects of such operation first.

Best,

Dawid

[1] https://github.com/apache/flink/pull/6787

On 19/12/2019 04:05, Kurt Young wrote:
> Hi James,
>
> If I understand correctly, you can use `TableEnvironment#sqlQuery` to
> achieve
> what you want. You can pass the whole sql statement in and get a
> `Table` back
> from the method. I believe this is the table you want which is
> semantically 
> equivalent with the stream you mentioned. 
>
> For example, you can further operate on the `Table` with other sql
> operations,
> like `GROUP BY cnt` on the returned table. You can think of it in this
> way that 
> Flink would attach another aggregation operator to the original plan,
> and this 
> operator can consume the retraction stream which the original sql
> statement 
> produced and start to generate correct results.
>
> Best,
> Kurt
>
>
> On Thu, Dec 19, 2019 at 1:25 AM James Baker  > wrote:
>
> Hi!
> I've been looking at Flink for the last few days and have very
> much appreciated the concept of Dynamic Tables, it solves a lot of
> my needs and handles a lot of the complex state tracking that is
> otherwise painful. I have a question about the composability of
> the system which the docs don't answer.
>
> The docs use the example of 'SELECT user, COUNT(url) as cnt FROM
> clicks GROUP BY user', where clicks is a stream coming in of user
> and the url they've clicked.
>
> From such a Table, I can then get a retract stream written into an
> external system, perhaps outputting (true, User1, 1), ..., (true,
> User1, 2) indicating that User1's clicked on something.
>
> Is there an idiomatic way to convert a retract stream into a
> semantically equivalent table?
>
> Thanks,
> James
>


signature.asc
Description: OpenPGP digital signature


Re: [Question] How to use different filesystem between checkpointdata and user data sink

2019-12-19 Thread Piotr Nowojski
Hi,

Can you share the full stack trace or just attach job manager and task managers 
logs? This exception should have had some cause logged below.

Piotrek

> On 19 Dec 2019, at 04:06, ouywl  wrote:
> 
> Hi Piotr Nowojski,
>I have move my filesystem plugin to FLINK_HOME/pulgins in flink 1.9.1. The 
> jobmanage don’t start up ,and It load the filesystem plugin in my owner 
> plugin jar . and the log is :
>   “2019-12-19 10:58:32,394 WARN org.apache.flink.configuration.Configuration 
> - Config uses deprecated configuration key 
> 'high-availability.zookeeper.storageDir' instead of proper key 
> 'high-availability.storageDir'
> 2019-12-19 10:58:32,398 INFO  com.filesystem.plugin.FileSystemFactoryEnhance  
>   -  trying to get hadoopEnv, hadoopPath = /conf/hadoop_conf
> 2019-12-19 10:58:32,434 WARN  org.apache.hadoop.conf.Configuration
>   - /tmp/mammut-core-site.xml:an attempt to override final 
> parameter: fs.defaultFS;  Ignoring.
> 2019-12-19 10:58:32,436 WARN  org.apache.hadoop.conf.Configuration
>   - /tmp/mammut-hdfs-site.xml:an attempt to override final 
> parameter: dfs.datanode.data.dir;  Ignoring.
> 2019-12-19 10:58:32,436 WARN  org.apache.hadoop.conf.Configuration
>   - /tmp/mammut-hdfs-site.xml:an attempt to override final 
> parameter: dfs.datanode.failed.volumes.tolerated;  Ignoring.
> 2019-12-19 10:58:32,436 WARN  org.apache.hadoop.conf.Configuration
>   - /tmp/mammut-hdfs-site.xml:an attempt to override final 
> parameter: dfs.namenode.name.dir;  Ignoring.
> 2019-12-19 10:58:32,878 INFO  
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint - Shutting 
> YarnJobClusterEntrypoint down with application status FAILED. Diagnostics 
> java.io.IOException: Could not create FileSystem for highly available storage 
> (high-availability.storageDir)
>   at 
> org.apache.flink.runtime.blob.BlobUtils.createFileSystemBlobStore(BlobUtils.java:119)
>   at 
> org.apache.flink.runtime.blob.BlobUtils.createBlobStoreFromConfig(BlobUtils.java:92)
>   at 
> org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils.createHighAvailabilityServices(HighAvailabilityServicesUtils.java:120)
>   at 
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.createHaServices(ClusterEntrypoint.java:292)
>   at 
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.initializeServices(ClusterEntrypoint.java:257)
>   at 
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runCluster(ClusterEntrypoint.java:202)
>   at 
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.lambda$startCluster$0(ClusterEntrypoint.java:164)
>   at java.security.AccessController.doPrivileged(Native Method)
>   at javax.security.auth.Subject.doAs(Subject.java:422)
>   at 
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1754)
>   at 
> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
>   at 
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:163)
>   at 
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runClusterEntrypoint(ClusterEntrypoint.java:501)
>  
>  at 
> org.apache.flink.yarn.entrypoint.YarnJobClusterEntrypoint.main(YarnJobClusterEntrypoint.java:102)"
> 
>   
> ouywl
> ou...@139.com
>  
> 
> On 12/19/2019 00:01,Piotr Nowojski 
>  wrote: 
> Hi,
> 
> As Yang Wang pointed out, you should use the new plugins mechanism.
> 
> If it doesn’t work, first make sure that you are shipping/distributing the 
> plugins jars correctly - the correct plugins directory structure both on the 
> client machine. Next make sure that the cluster has the same correct setup. 
> This is especially true for the standalone/cluster execution modes. For yarn, 
> mesos, docker the plugins dir should be shipped to the cluster by Flink 
> itself, however Plugins support in yarn is currently semi broken [1]. This is 
> already fixed, but waiting to be released in 1.9.2 and 1.10.
> 
> If it still doesn’t work, look for TaskManager logs what plugins/file systems 
> are being loaded during the startup. If none, that's the problem.
> 
> Piotrek
> 
> [1] https://issues.apache.org/jira/browse/FLINK-14382 
> 
> 
>> On 18 Dec 2019, at 12:40, Yang Wang > > wrote:
>> 
>> You could have a try the new plugin mechanism.
>> Create a new directory named "myhdfs" under $FLINK_HOME/plugins, and then 
>> put your filesystem related jars in it.
>> Different plugins will be loaded by separate classloader to avoid conflict.
>> 
>> 
>> Best,

Re: Restore metrics on broadcast state after restart

2019-12-19 Thread Gaël Renoux
Thanks, that's exactly what I needed!

On Wed, Dec 18, 2019 at 5:44 PM Yun Tang  wrote:

> Hi Gaël
>
> You can try initializeState [1] to initialize your metrics values from
> states when restoring from a checkpoint.
>
> context.getOperatorStateStore().getBroadcastState()  could visit your
> restored broadcast state.
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/state.html#checkpointedfunction
>
> Best
> Yun Tang
>
> --
> *From:* Gaël Renoux 
> *Sent:* Tuesday, December 17, 2019 23:22
> *To:* user 
> *Subject:* Restore metrics on broadcast state after restart
>
> Hi everyone
>
> I have an KeyedBroadcastProcessFunction with a broadcast state (a bunch of
> rules), and I have set up a few gauge metrics on that state (things such as
> number of known rules and timestamp of the last rule received). However, I
> have on an issue when the server restarts from a checkpoint or a savepoint:
> metrics values are not restored.
>
> That's nothing anomalous: the fields used in the metrics are transient,
> not part of the state (I have followed this doc:
> https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/metrics.html#metric-types).
> The fields will be reset to the proper value in the next call to
> processBroadcastElement(), but that's not enough for my use case: rules
> updates aren't that frequent (it could be minutes or even hours before the
> next one). We can't have the metrics offline for that long.
>
> Is there any way to reset those fields without waiting for the next
> messages to arrive? The open() method doesn't have access to the broadcast
> state, so I can't do it there. I could do it in processElement() (normal
> element are much more frequent than rules), but it's far from ideal:
> - it would be done again and again for every single element received,
> which is overkill;
> - it could only update the metric on the current subtask, not the others,
> so one subtask could lag behind.
>
> Am I missing something here ? Is there any way to trigger a reset of the
> value when the broadcast state is reconstructed ?
>
> Thanks for any help,
> Gaël Renoux
>
>

-- 
Gaël Renoux
Senior R&D Engineer, DataDome
M +33 6 76 89 16 52  <+33+6+76+89+16+52>
E gael.ren...@datadome.co  
W www.datadome.co





[image: Read DataDome reviews on G2]



DataStream API min max aggregation on other fields

2019-12-19 Thread Lu Weizheng
Hi all,

On a KeyedStream, when I use maxBy or minBy, I will get the max or min element. 
It means other fields will be kept as the max or min element. This is quite 
clear. However, when I use max or min, how do Flink do on other fields?


val tupleStream = senv.fromElements(
  (0, 0, 0), (0, 1, 1), (0, 2, 2),
  (1, 0, 6), (1, 1, 7), (1, 2, 8)
)
//  (0,0,0)
//  (0,0,1)
//  (0,0,2)
//  (1,0,6)
//  (1,0,7)
//  (1,0,8)
val maxByStream = tupleStream.keyBy(0).max(2).print()

In this case, the second field use the first element's 0.


class IntTupleSource extends RichSourceFunction[(Int, Int, Int)]{

  var isRunning: Boolean = true
  var i = 0

  val rand = new Random()

  override def run(srcCtx: SourceContext[(Int, Int, Int)]): Unit = {

while (isRunning) {

  // 将数据源收集写入SourceContext
  srcCtx.collect((0, i, i))
  i += 1
  Thread.sleep(1000)
}
  }

  override def cancel(): Unit = {
isRunning = false
  }
}

//(0,0,0)
//(0,1,2)
//(0,3,4)
//(0,5,6)
//(0,7,8)
//(0,9,10)

val maxWindowStream = senv.addSource(new IntTupleSource)
  .keyBy(0)
  .timeWindow(Time.milliseconds(2000))
  .max(2).print()


In this case, the result is not so clear...

So, for max and min, the two operator can not make sure the result of other 
fields ?

Thank you so much if anyone can replay.

Weizheng


Re: DataStream API min max aggregation on other fields

2019-12-19 Thread vino yang
Hi weizheng,

IMHO, I do not know where is not clear to you? Is the result not correct?
Can you share the correct result based on your understanding?

The "keyBy" specifies group field and min/max do the aggregation in the
other field based on the position you specified.

Best,
Vino

Lu Weizheng  于2019年12月19日周四 下午5:00写道:

> Hi all,
>
> On a KeyedStream, when I use maxBy or minBy, I will get the max or min
> element. It means other fields will be kept as the max or min element. This
> is quite clear. However, when I use max or min, how do Flink do on other
> fields?
>
> val tupleStream = senv.fromElements(
>   (0, 0, 0), (0, 1, 1), (0, 2, 2),
>   (1, 0, 6), (1, 1, 7), (1, 2, 8)
> )
> //  (0,0,0)
> //  (0,0,1)
> //  (0,0,2)
> //  (1,0,6)
> //  (1,0,7)
> //  (1,0,8)
> val maxByStream = tupleStream.keyBy(0).max(2).print()
>
> In this case, the second field use the first element's 0.
>
> class IntTupleSource extends RichSourceFunction[(Int, Int, Int)]{
>
>   var isRunning: Boolean = true
>   var i = 0
>
>   val rand = new Random()
>
>   override def run(srcCtx: SourceContext[(Int, Int, Int)]): Unit = {
>
> while (isRunning) {
>
>   // 将数据源收集写入SourceContext
>   srcCtx.collect((0, i, i))
>   i += 1
>   Thread.sleep(1000)
> }
>   }
>
>   override def cancel(): Unit = {
> isRunning = false
>   }
> }
>
> //(0,0,0)
> //(0,1,2)
> //(0,3,4)
> //(0,5,6)
> //(0,7,8)
> //(0,9,10)
>
> val maxWindowStream = senv.addSource(new IntTupleSource)
>   .keyBy(0)
>   .timeWindow(Time.milliseconds(2000))
>   .max(2).print()
>
>
>
> In this case, the result is not so clear...
>
> So, for max and min, the two operator can not make sure the result of
> other fields ?
>
> Thank you so much if anyone can replay.
>
> Weizheng
>


Can trigger fire early brefore specific element get into ProcessingTimeSessionWindow

2019-12-19 Thread Utopia
Hi,

I want to fire and evaluate the ProcessingTimeSessionWindow when a specific 
element come into current window. But I want to exclude the specific element 
when processing window function and remaining it for the next evaluation.

Thanks

Best  regards
Utopia


Re: DataStream API min max aggregation on other fields

2019-12-19 Thread Biao Liu
Hi Lu,

@vino yang  I think what he means is that the "max"
semantics between window and non-window are different. It changes
non-aggregated fields unpredictably.

That's really an interesting question.

I take a look at the relevant implementation. From the perspective of
codes, "max" always keeps the non-aggregated fields with the value of first
arrived record, which should be (0, 0, x) in this case. However when the
window is purged, the state (which keeps non-aggregated fields of first
arrived record and the maximum field) will be cleared. That means the
"first arrived record" will always be reset when a window is purged. That's
why the second column increases unpredictably.

The semantics here is so confused to me.

Thanks,
Biao /'bɪ.aʊ/



On Thu, 19 Dec 2019 at 17:50, vino yang  wrote:

> Hi weizheng,
>
> IMHO, I do not know where is not clear to you? Is the result not correct?
> Can you share the correct result based on your understanding?
>
> The "keyBy" specifies group field and min/max do the aggregation in the
> other field based on the position you specified.
>
> Best,
> Vino
>
> Lu Weizheng  于2019年12月19日周四 下午5:00写道:
>
>> Hi all,
>>
>> On a KeyedStream, when I use maxBy or minBy, I will get the max or min
>> element. It means other fields will be kept as the max or min element. This
>> is quite clear. However, when I use max or min, how do Flink do on other
>> fields?
>>
>> val tupleStream = senv.fromElements(
>>   (0, 0, 0), (0, 1, 1), (0, 2, 2),
>>   (1, 0, 6), (1, 1, 7), (1, 2, 8)
>> )
>> //  (0,0,0)
>> //  (0,0,1)
>> //  (0,0,2)
>> //  (1,0,6)
>> //  (1,0,7)
>> //  (1,0,8)
>> val maxByStream = tupleStream.keyBy(0).max(2).print()
>>
>> In this case, the second field use the first element's 0.
>>
>> class IntTupleSource extends RichSourceFunction[(Int, Int, Int)]{
>>
>>   var isRunning: Boolean = true
>>   var i = 0
>>
>>   val rand = new Random()
>>
>>   override def run(srcCtx: SourceContext[(Int, Int, Int)]): Unit = {
>>
>> while (isRunning) {
>>
>>   // 将数据源收集写入SourceContext
>>   srcCtx.collect((0, i, i))
>>   i += 1
>>   Thread.sleep(1000)
>> }
>>   }
>>
>>   override def cancel(): Unit = {
>> isRunning = false
>>   }
>> }
>>
>> //(0,0,0)
>> //(0,1,2)
>> //(0,3,4)
>> //(0,5,6)
>> //(0,7,8)
>> //(0,9,10)
>>
>> val maxWindowStream = senv.addSource(new IntTupleSource)
>>   .keyBy(0)
>>   .timeWindow(Time.milliseconds(2000))
>>   .max(2).print()
>>
>>
>>
>> In this case, the result is not so clear...
>>
>> So, for max and min, the two operator can not make sure the result of
>> other fields ?
>>
>> Thank you so much if anyone can replay.
>>
>> Weizheng
>>
>


Re: Keyed stream, parallelism, load balancing and ensuring that the same key go to the same Task Manager and task slot

2019-12-19 Thread KristoffSC
Hi :)
Any thoughts about this? 



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: [DISCUSS] Drop vendor specific repositories from pom.xml

2019-12-19 Thread Till Rohrmann
The profiles make bumping ZooKeeper's version a bit more cumbersome. I
would be interested for this reason to get rid of them, too.

Cheers,
Till

On Wed, Dec 18, 2019 at 5:35 PM Robert Metzger  wrote:

> I guess we are talking about this profile [1] in the pom.xml?
>
> +1 to remove.
>
> I'm not sure if we need to rush this for the 1.10 release. The profile is
> not doing us any harm at the moment.
>
> [1]https://github.com/apache/flink/blob/master/pom.xml#L1035
>
> On Wed, Dec 18, 2019 at 4:51 PM Till Rohrmann 
> wrote:
>
>> Hi everyone,
>>
>> following the discussion started by Seth [1] I would like to discuss
>> dropping the vendor specific repositories from Flink's parent pom.xml. As
>> building Flink against a vendor specific Hadoop version is no longer
>> needed
>> (as it simply needs to be added to the classpath) and documented, I
>> believe
>> that the vendor specific repositories and the mapr profile have become
>> obsolete. Moreover, users can still use vendor specific Hadoop versions if
>> they configure their local maven to point to the respective repository
>> [2].
>> Flink's sources would simply no longer be shipped with this option.
>>
>> Are there any concerns about dropping the vendor specific repositories
>> from
>> pom.xml? I would like to make this change for the upcoming Flink 1.10
>> release if possible.
>>
>> [1]
>>
>> https://lists.apache.org/thread.html/83afcf6c0d5d7a0a7179cbdac9593ebe7478b0dc548781bf9915a006%40%3Cdev.flink.apache.org%3E
>> [2] https://maven.apache.org/guides/mini/guide-multiple-repositories.html
>>
>> Cheers,
>> Till
>>
>


Flink Prometheus metric doubt

2019-12-19 Thread Jesús Vásquez
Hi all, i'm monitoring Flink jobs using prometheus.
I have been trying to use the metrics flink_jobmanager_job_uptime/downtime
in order to create an alert, that fires when one of this values emits -1
since the doc says this is the behavior of the metric when the job gets to
a completed state.
The thing is that i have tested the behavior when one of my job fails and
the mentioned metrics never emit something different than zero. Finally the
metric disappears after the job has failed.
Am i missing something or is this the expected behavior ?


Re: Apache Flink - Flink Metrics - How to distinguish b/w metrics for two job manager on the same host

2019-12-19 Thread M Singh
 Thanks Vino and Biao for your help.  Mans
On Thursday, December 19, 2019, 02:25:40 AM EST, Biao Liu 
 wrote:  
 
 Hi Mans,
That's indeed a problem. We have a plan to fix it. I think it could be included 
in 1.11. You could follow this issue [1] to check the progress. 
[1] https://issues.apache.org/jira/browse/FLINK-9543

Thanks,Biao /'bɪ.aʊ/


On Thu, 19 Dec 2019 at 14:51, vino yang  wrote:

Hi Mans,
IMO, one job manager represents one Flink cluster and one Flink cluster has a 
suite of Flink configuration e.g. metrics reporter.
Some metrics reporters support tag feature, you can specify it to distinguish 
different Flink cluster.[1]
[1]: 
https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/metrics.html#datadog-orgapacheflinkmetricsdatadogdatadoghttpreporter
Best,Vino
M Singh  于2019年12月19日周四 上午2:54写道:

Hi:
I am using AWS EMR with Flink application and two of the job managers are 
running on the same host.  I am looking at the metrics documentation (Apache 
Flink 1.9 Documentation: Metrics) and and see the following: 

| 
| 
|  | 
Apache Flink 1.9 Documentation: Metrics


 |

 |

 |

   
   - metrics.scope.jm  
  - Default: .jobmanager
  - Applied to all metrics that were scoped to a job manager.
  - 

...
List of all Variables
   
   - JobManager: 
   - TaskManager: , 
   - Job: , 
   - Task: , , , , 

   - Operator: ,, 


My question is there a way to distinguish b/w the two job managers ? I see only 
the  variable for JobManager and since the two are running on the same 
host, the value is the same.  Is there any other variable that I can use to 
distinguish the two.

For taskmanager I have taskmanager id but am not sure about the job manager.
Thanks
Mans


  

POJO ERROR

2019-12-19 Thread Alexandru Vasiu
Hi,

I use flink-scala version 1.9.1 and scala 2.12.10, and I defined a data
type which is a bit more complex: it has a list in it and even a
dictionary. When I try to use a custom map I got this error:

INFO  org.apache.flink.api.java.typeutils.TypeExtractor - class A  does not
contain a setter for field fields
INFO  org.apache.flink.api.java.typeutils.TypeExtractor - class A cannot be
used as a POJO type because not all fields are valid POJO fields, and must
be processed as GenericType. Please read the Flink documentation on "Data
Types & Serialization" for details of the effect on performance.

Is there a fix for this? Or a workaround?

Thank you,
Alex


Re: Can trigger fire early brefore specific element get into ProcessingTimeSessionWindow

2019-12-19 Thread vino yang
Hi Utopia,

Flink provides a high scalability window mechanism.[1]

For your scene, you can customize your window assigner and trigger.

[1]:
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html

Best,
Vino

Utopia  于2019年12月19日周四 下午5:56写道:

> Hi,
>
> I want to fire and evaluate the ProcessingTimeSessionWindow when a
> specific element come into current window. But I want to exclude the
> specific element when processing window function and remaining it for the
> next evaluation.
>
> Thanks
>
> Best  regards
> Utopia
>


Re: POJO ERROR

2019-12-19 Thread Timo Walther

Hi Alex,

the problem is that `case class` classes are analyzed by Scala specific 
code whereas `class` classes are analyzed with Java specific code. So I 
would recommend to use a case class to make sure you stay in the "Scala 
world" otherwise the fallback is the Java-based TypeExtractor.


For your custom Map, you can simply ignore this error message. It will 
fallback to the Java-based TypeExtractor and treat it as a generic type 
because it is not a POJO.


I hope this helps.

Regards,
Timo


On 19.12.19 12:41, Alexandru Vasiu wrote:

Hi,

I use flink-scala version 1.9.1 and scala 2.12.10, and I defined a data 
type which is a bit more complex: it has a list in it and even a 
dictionary. When I try to use a custom map I got this error:


INFO  org.apache.flink.api.java.typeutils.TypeExtractor - class A  does 
not contain a setter for field fields
INFO  org.apache.flink.api.java.typeutils.TypeExtractor - class A cannot 
be used as a POJO type because not all fields are valid POJO fields, and 
must be processed as GenericType. Please read the Flink documentation on 
"Data Types & Serialization" for details of the effect on performance.


Is there a fix for this? Or a workaround?

Thank you,
Alex




Re: Kafka table descriptor missing startFromTimestamp()

2019-12-19 Thread Timo Walther

This issue is work in progress:

https://issues.apache.org/jira/browse/FLINK-15220


On 19.12.19 09:07, Dawid Wysakowicz wrote:

Hi,

The only reason why it was not exposed at the beginning is that not all
versions of the consumers support starting from a specific timestamp. I
think we could expose such setting now. Would you like to create an
issue for it?

Best,

Dawid

On 19/12/2019 06:56, Steve Whelan wrote:

Examining the org.apache.flink.table.descriptors.Kafka class in Flink
v1.8, it has the following startUpModes for consumers:
     .startFromEarliest()
     .startFromLatest()
     .startFromSpecificOffsets(...)

However, it does not have a method to support starting from a
Timestamp. The FlinkKafkaConsumer supports this feature though. Was it
a conscience decision to leave that start up mode out of the table
descriptor? If so, what was the reasoning?






Re: POJO ERROR

2019-12-19 Thread Alexandru Vasiu
I used `case class` for example case class A(a: Map[String, String]) so it
should work

Alex

On Thu, Dec 19, 2019 at 2:18 PM Timo Walther  wrote:

> Hi Alex,
>
> the problem is that `case class` classes are analyzed by Scala specific
> code whereas `class` classes are analyzed with Java specific code. So I
> would recommend to use a case class to make sure you stay in the "Scala
> world" otherwise the fallback is the Java-based TypeExtractor.
>
> For your custom Map, you can simply ignore this error message. It will
> fallback to the Java-based TypeExtractor and treat it as a generic type
> because it is not a POJO.
>
> I hope this helps.
>
> Regards,
> Timo
>
>
> On 19.12.19 12:41, Alexandru Vasiu wrote:
> > Hi,
> >
> > I use flink-scala version 1.9.1 and scala 2.12.10, and I defined a data
> > type which is a bit more complex: it has a list in it and even a
> > dictionary. When I try to use a custom map I got this error:
> >
> > INFO  org.apache.flink.api.java.typeutils.TypeExtractor - class A  does
> > not contain a setter for field fields
> > INFO  org.apache.flink.api.java.typeutils.TypeExtractor - class A cannot
> > be used as a POJO type because not all fields are valid POJO fields, and
> > must be processed as GenericType. Please read the Flink documentation on
> > "Data Types & Serialization" for details of the effect on performance.
> >
> > Is there a fix for this? Or a workaround?
> >
> > Thank you,
> > Alex
>
>


回复: DataStream API min max aggregation on other fields

2019-12-19 Thread Lu Weizheng
Yes, the unpredictable non-key and non-aggregated fields make me confused. As 
Biao said, It is because the purged window state.
So when I want to use max or min, I should only use aggregated field. Other 
fields are not defined, I should take care not use them.

Thank you guys for your replies!

发件人: Biao Liu 
发送时间: 2019年12月19日 18:10
收件人: vino yang 
抄送: Lu Weizheng ; user@flink.apache.org 

主题: Re: DataStream API min max aggregation on other fields

Hi Lu,

@vino yang I think what he means is that the 
"max" semantics between window and non-window are different. It changes 
non-aggregated fields unpredictably.

That's really an interesting question.

I take a look at the relevant implementation. From the perspective of codes, 
"max" always keeps the non-aggregated fields with the value of first arrived 
record, which should be (0, 0, x) in this case. However when the window is 
purged, the state (which keeps non-aggregated fields of first arrived record 
and the maximum field) will be cleared. That means the "first arrived record" 
will always be reset when a window is purged. That's why the second column 
increases unpredictably.

The semantics here is so confused to me.

Thanks,
Biao /'bɪ.aʊ/



On Thu, 19 Dec 2019 at 17:50, vino yang 
mailto:yanghua1...@gmail.com>> wrote:
Hi weizheng,

IMHO, I do not know where is not clear to you? Is the result not correct? Can 
you share the correct result based on your understanding?

The "keyBy" specifies group field and min/max do the aggregation in the other 
field based on the position you specified.

Best,
Vino

Lu Weizheng mailto:luweizhen...@hotmail.com>> 
于2019年12月19日周四 下午5:00写道:
Hi all,

On a KeyedStream, when I use maxBy or minBy, I will get the max or min element. 
It means other fields will be kept as the max or min element. This is quite 
clear. However, when I use max or min, how do Flink do on other fields?


val tupleStream = senv.fromElements(
  (0, 0, 0), (0, 1, 1), (0, 2, 2),
  (1, 0, 6), (1, 1, 7), (1, 2, 8)
)
//  (0,0,0)
//  (0,0,1)
//  (0,0,2)
//  (1,0,6)
//  (1,0,7)
//  (1,0,8)
val maxByStream = tupleStream.keyBy(0).max(2).print()

In this case, the second field use the first element's 0.


class IntTupleSource extends RichSourceFunction[(Int, Int, Int)]{

  var isRunning: Boolean = true
  var i = 0

  val rand = new Random()

  override def run(srcCtx: SourceContext[(Int, Int, Int)]): Unit = {

while (isRunning) {

  // 将数据源收集写入SourceContext
  srcCtx.collect((0, i, i))
  i += 1
  Thread.sleep(1000)
}
  }

  override def cancel(): Unit = {
isRunning = false
  }
}

//(0,0,0)
//(0,1,2)
//(0,3,4)
//(0,5,6)
//(0,7,8)
//(0,9,10)

val maxWindowStream = senv.addSource(new IntTupleSource)
  .keyBy(0)
  .timeWindow(Time.milliseconds(2000))
  .max(2).print()


In this case, the result is not so clear...

So, for max and min, the two operator can not make sure the result of other 
fields ?

Thank you so much if anyone can replay.

Weizheng


Re: POJO ERROR

2019-12-19 Thread Alexandru Vasiu
I'm sorry for my last message, it might be incomplete.

So I used case classed for my objects, but it doesn't work.

Riching this error: "Exception in thread "main"
org.apache.flink.shaded.guava18.com.google.common.util.concurrent.ExecutionError:
java.lang.NoClassDefFoundError: scala/math/Ordering$$anon$9" when I'm
trying to apply the map/flatMap function over the stream (which is from a
Kafka consumer).


Alex

On Thu, Dec 19, 2019 at 2:24 PM Alexandru Vasiu 
wrote:

> I used `case class` for example case class A(a: Map[String, String]) so it
> should work
>
> Alex
>
> On Thu, Dec 19, 2019 at 2:18 PM Timo Walther  wrote:
>
>> Hi Alex,
>>
>> the problem is that `case class` classes are analyzed by Scala specific
>> code whereas `class` classes are analyzed with Java specific code. So I
>> would recommend to use a case class to make sure you stay in the "Scala
>> world" otherwise the fallback is the Java-based TypeExtractor.
>>
>> For your custom Map, you can simply ignore this error message. It will
>> fallback to the Java-based TypeExtractor and treat it as a generic type
>> because it is not a POJO.
>>
>> I hope this helps.
>>
>> Regards,
>> Timo
>>
>>
>> On 19.12.19 12:41, Alexandru Vasiu wrote:
>> > Hi,
>> >
>> > I use flink-scala version 1.9.1 and scala 2.12.10, and I defined a data
>> > type which is a bit more complex: it has a list in it and even a
>> > dictionary. When I try to use a custom map I got this error:
>> >
>> > INFO  org.apache.flink.api.java.typeutils.TypeExtractor - class A  does
>> > not contain a setter for field fields
>> > INFO  org.apache.flink.api.java.typeutils.TypeExtractor - class
>> A cannot
>> > be used as a POJO type because not all fields are valid POJO fields,
>> and
>> > must be processed as GenericType. Please read the Flink documentation
>> on
>> > "Data Types & Serialization" for details of the effect on performance.
>> >
>> > Is there a fix for this? Or a workaround?
>> >
>> > Thank you,
>> > Alex
>>
>>


Re: POJO ERROR

2019-12-19 Thread Timo Walther

That's sounds like a classloading or most likely dependency issue.

Are all dependencies including Flink use the same Scala version? Could 
you maybe share reproducible some code with us?


Regards,
Timo


On 19.12.19 13:53, Alexandru Vasiu wrote:

I'm sorry for my last message, it might be incomplete.

So I used case classed for my objects, but it doesn't work.

Riching this error: "Exception in thread "main" 
org.apache.flink.shaded.guava18.com.google.common.util.concurrent.ExecutionError: 
java.lang.NoClassDefFoundError: scala/math/Ordering$$anon$9" when I'm 
trying to apply the map/flatMap function over the stream (which is from 
a Kafka consumer).



Alex

On Thu, Dec 19, 2019 at 2:24 PM Alexandru Vasiu 
mailto:alexandru.ava...@gmail.com>> wrote:


I used `case class` for example case class A(a: Map[String, String])
so it should work

Alex

On Thu, Dec 19, 2019 at 2:18 PM Timo Walther mailto:twal...@apache.org>> wrote:

Hi Alex,

the problem is that `case class` classes are analyzed by Scala
specific
code whereas `class` classes are analyzed with Java specific
code. So I
would recommend to use a case class to make sure you stay in the
"Scala
world" otherwise the fallback is the Java-based TypeExtractor.

For your custom Map, you can simply ignore this error message.
It will
fallback to the Java-based TypeExtractor and treat it as a
generic type
because it is not a POJO.

I hope this helps.

Regards,
Timo


On 19.12.19 12:41, Alexandru Vasiu wrote:
 > Hi,
 >
 > I use flink-scala version 1.9.1 and scala 2.12.10, and I
defined a data
 > type which is a bit more complex: it has a list in it and even a
 > dictionary. When I try to use a custom map I got this error:
 >
 > INFO  org.apache.flink.api.java.typeutils.TypeExtractor -
class A  does
 > not contain a setter for field fields
 > INFO  org.apache.flink.api.java.typeutils.TypeExtractor -
class A cannot
 > be used as a POJO type because not all fields are valid POJO
fields, and
 > must be processed as GenericType. Please read the Flink
documentation on
 > "Data Types & Serialization" for details of the effect on
performance.
 >
 > Is there a fix for this? Or a workaround?
 >
 > Thank you,
 > Alex





Re: Processing post to sink?

2019-12-19 Thread Robert Metzger
Hey Theo,

your solution of turning the sink into a process function should work. I'm
just not sure how easy it is to re-use the StreamingFileSink inside it.
Have you considered sending all the records to a parallelism=1 process
function sitting "next" to the StreamingFileSink. You could track the
watermarks and partitions in there, and listen to the
"notifyCheckpointComplete()" calls. Since that ProcessFunction is receiving
data at the same rate as the sink, it should align with the sinks. I'm not
100% sure if this solution really works, but I wanted to bring to see if
you've considered it.

Best,
Robert



On Sat, Dec 14, 2019 at 7:08 PM Theo Diefenthal <
theo.diefent...@scoop-software.de> wrote:

> Hi Tim,
>
> As I said:
>
> > Do you have any better ideas for implementation or is this the best way
> to go? I thought about just building a custom sink inheriting from
> StreamingFileSink, but I don't know how to trigger my > jobs then only once
> . I _could_ check for my sink parallel subtask index to be something like 0
> and only in that case trigger the subtasks, but I have heavy skew in my
> parallel instances:
> > Some process millions of elements, whereas other process just 10 events
> a day. If my "notification-sink-subtask" would end up on a partition with
> those few events, I would get way too
> > seldom new triggers. And I further wouldn't know if the other instances
> also had already committed there parquet files.
>
> I don't know how to make that within a sink function. I kind of need a
> "synchronisation-barrier" after all "notify-checkpoint-complete"-calls to
> all sink instances. Can you tell me on how to do that in my own sink
> function?
>
> Best regards
> Theo
>
> --
> *Von: *"Timothy Victor" 
> *An: *"Theo Diefenthal" 
> *CC: *"user" 
> *Gesendet: *Samstag, 14. Dezember 2019 16:27:31
> *Betreff: *Re: Processing post to sink?
>
> Why not implement your own SinkFunction, or maybe inherit from the one you
> are using now?
>
> Tim
>
> On Sat, Dec 14, 2019, 8:52 AM Theo Diefenthal <
> theo.diefent...@scoop-software.de> wrote:
>
>> Hi there,
>>
>> In my pipeline, I write data into a partitioned parquet directory via
>> StreamingFileSink and a partitioner like:
>>
>> @Override
>> public String getBucketId(V element, Context context) {
>> return "partitionkey=" + element.getPartitionkey();
>> }
>>
>> That works well so far.
>>
>> Now I need to know when all sink instances are fully "done" with a
>> partition in order to trigger some export jobs (for machine learning/model
>> training) and also notify Impala about the new (final) partition.
>>
>> In my case, I can well define "fully done". The partitionkey is directly
>> deduced from event time and my watermarks guarantee no late arrivals. So
>> once a watermark passes a certain event time, I know that the prior
>> partition is completed and can trigger my stuff. Well not directly: Once
>> the watermark passes, I need to wait for the next checkpoint to be
>> completed because only then, the parquet files are committed and the
>> partition is fully written to.
>>
>> The question is: How do I implement my "partition-completed"-condition
>> check in Flink? It pretty much comes down to that I want to do some
>> processing _after_ a Sink based on the sinks progress.
>> (Watermark+checkpoints)
>>
>> The only idea I got up with so far is: Make the sink a process-function
>> which also emits elements. Only on a completed checkpoint, emit an element
>> with the current watermark downstream. In the next step, assign event
>> timestamps based on these events and merge the parallel subtasks into one,
>> thus keeping track of the global watermark. In the task with parallelism 1,
>> I could then issue my impala queries and export jobs. (Which should not be
>> called by multiple parallel instances simultaneously).
>>
>> Do you have any better ideas for implementation or is this the best way
>> to go? I thought about just building a custom sink inheriting from
>> StreamingFileSink, but I don't know how to trigger my jobs then only once .
>> I _could_ check for my sink parallel subtask index to be something like 0
>> and only in that case trigger the subtasks, but I have heavy skew in my
>> parallel instances: Some process millions of elements, whereas other
>> process just 10 events a day. If my "notification-sink-subtask" would end
>> up on a partition with those few events, I would get way too seldom new
>> triggers. And I further wouldn't know if the other instances also had
>> already committed there parquet files.
>>
>> What kind of problems do I need to expect when making a sink a
>> process-function?
>>
>> Best regards
>> Theo
>>
>
>
> --
> SCOOP Software GmbH - Gut Maarhausen - Eiler Straße 3 P - D-51107 Köln
> Theo Diefenthal
>
> T +49 221 801916-196 - F +49 221 801916-17 - M +49 160 90506575
> theo.diefent...@scoop-software.de - www.scoop-software.de
> Sitz der Gesellschaft: Köln, Handelsregister: Köln,
> Handelsregist

Re: POJO ERROR

2019-12-19 Thread Alexandru Vasiu
This is a part of my Gradle config:

ext {
scalaVersion = '2.12'
flinkVersion = '1.9.1'
scalaBuildVersion = "${scalaVersion}.10"
scalaMockVersion = '4.4.0'
circeGenericVersion = '0.12.3'
circeExtrasVersion = '0.12.2'
pardiseVersion = '2.1.1'
slf4jVersion = '1.7.7'
log4jVersion = '1.2.17'
sourceDir = 'src/main/scala'
testDir = 'src/test/scala'
}
repositories {
mavenCentral()
//maven { url "
https://repository.apache.org/content/repositories/snapshots/"; }
}
configurations {
scalaCompilerPlugin
}
dependencies {
implementation "org.scala-lang:scala-library:${scalaBuildVersion}"
// --
// Compile-time dependencies that should NOT be part of the
// shadow jar and are provided in the lib folder of Flink
// --
//compile "org.apache.flink:flink-java:${flinkVersion}"
implementation
"org.apache.flink:flink-streaming-scala_${scalaVersion}:${flinkVersion}"
implementation
"org.apache.flink:flink-connector-kafka_${scalaVersion}:${flinkVersion}"
// --
// Dependencies that should be part of the shadow jar, e.g.
// connectors. These must be in the flinkShadowJar configuration!
// --
//flinkShadowJar
"org.apache.flink:flink-connector-kafka-0.11_${scalaBinaryVersion}:${flinkVersion}"
// https://mvnrepository.com/artifact/io.circe/
implementation
"io.circe:circe-generic_${scalaVersion}:${circeGenericVersion}"
implementation
"io.circe:circe-generic-extras_${scalaVersion}:${circeExtrasVersion}"
implementation
"io.circe:circe-parser_${scalaVersion}:${circeGenericVersion}"
// https://mvnrepository.com/artifact/org.scalamacros/paradise
scalaCompilerPlugin
"org.scalamacros:paradise_${scalaBuildVersion}:${pardiseVersion}"
implementation "log4j:log4j:${log4jVersion}"
implementation "org.slf4j:slf4j-log4j12:${slf4jVersion}"
// Add test dependencies here.
//testImplementation "junit:junit:4.12"
testImplementation "org.scalatest:scalatest_${scalaVersion}:3.1.0"
// https://mvnrepository.com/artifact/org.scalamock/scalamock
testImplementation
"org.scalamock:scalamock_${scalaVersion}:${scalaMockVersion}"
}

So all are with the same scala version. I cannot share the code, but the
main app looks like:

val env = StreamExecutionEnvironment.getExecutionEnvironment
val stream  = env
.addSource(KAFKA_STREAM) // this will get us a stream with our
object model which is like this: case class A(a:Map[String,
other_case_class_obj], b: List[other_case_class_obj], c: String)
.flatMap(CustomFlatMap())
.print

Thank you,
Alex

On Thu, Dec 19, 2019 at 3:14 PM Timo Walther  wrote:

> That's sounds like a classloading or most likely dependency issue.
>
> Are all dependencies including Flink use the same Scala version? Could
> you maybe share reproducible some code with us?
>
> Regards,
> Timo
>
>
> On 19.12.19 13:53, Alexandru Vasiu wrote:
> > I'm sorry for my last message, it might be incomplete.
> >
> > So I used case classed for my objects, but it doesn't work.
> >
> > Riching this error: "Exception in thread "main"
> >
> org.apache.flink.shaded.guava18.com.google.common.util.concurrent.ExecutionError:
>
> > java.lang.NoClassDefFoundError: scala/math/Ordering$$anon$9" when I'm
> > trying to apply the map/flatMap function over the stream (which is from
> > a Kafka consumer).
> >
> >
> > Alex
> >
> > On Thu, Dec 19, 2019 at 2:24 PM Alexandru Vasiu
> > mailto:alexandru.ava...@gmail.com>> wrote:
> >
> > I used `case class` for example case class A(a: Map[String, String])
> > so it should work
> >
> > Alex
> >
> > On Thu, Dec 19, 2019 at 2:18 PM Timo Walther  > > wrote:
> >
> > Hi Alex,
> >
> > the problem is that `case class` classes are analyzed by Scala
> > specific
> > code whereas `class` classes are analyzed with Java specific
> > code. So I
> > would recommend to use a case class to make sure you stay in the
> > "Scala
> > world" otherwise the fallback is the Java-based TypeExtractor.
> >
> > For your custom Map, you can simply ignore this error message.
> > It will
> > fallback to the Java-based TypeExtractor and treat it as a
> > generic type
> > because it is not a POJO.
> >
> > I hope this helps.
> >
> > Regards,
> > Timo
> >
> >
> > On 19.12.19 12:41, Alexandru Vasiu wrote:
> >  > Hi,
> >  >
> >  > I use flink-scala version 1.9.1 and scala 2.12.10, and I
> > defined a data
> >  > type which is a bit more complex: it has a list in it and
> even a
> >  > dictionary. When I try to use a custom map I got this error:
> >  

Re: POJO ERROR

2019-12-19 Thread Timo Walther
I see a mismatch between scalaBuildVersion and scalaVersion could this 
be the issue?


Regards,
Timo


On 19.12.19 14:33, Alexandru Vasiu wrote:

This is a part of my Gradle config:

ext {
     scalaVersion = '2.12'
     flinkVersion = '1.9.1'
     scalaBuildVersion = "${scalaVersion}.10"
     scalaMockVersion = '4.4.0'
     circeGenericVersion = '0.12.3'
     circeExtrasVersion = '0.12.2'
     pardiseVersion = '2.1.1'
     slf4jVersion = '1.7.7'
     log4jVersion = '1.2.17'
     sourceDir = 'src/main/scala'
     testDir = 'src/test/scala'
}
repositories {
     mavenCentral()
     //maven { url 
"https://repository.apache.org/content/repositories/snapshots/"; }

}
configurations {
     scalaCompilerPlugin
}
dependencies {
     implementation "org.scala-lang:scala-library:${scalaBuildVersion}"
     // --
     // Compile-time dependencies that should NOT be part of the
     // shadow jar and are provided in the lib folder of Flink
     // --
     //compile "org.apache.flink:flink-java:${flinkVersion}"
     implementation 
"org.apache.flink:flink-streaming-scala_${scalaVersion}:${flinkVersion}"
     implementation 
"org.apache.flink:flink-connector-kafka_${scalaVersion}:${flinkVersion}"

     // --
     // Dependencies that should be part of the shadow jar, e.g.
     // connectors. These must be in the flinkShadowJar configuration!
     // --
     //flinkShadowJar 
"org.apache.flink:flink-connector-kafka-0.11_${scalaBinaryVersion}:${flinkVersion}"

     // https://mvnrepository.com/artifact/io.circe/
     implementation 
"io.circe:circe-generic_${scalaVersion}:${circeGenericVersion}"
     implementation 
"io.circe:circe-generic-extras_${scalaVersion}:${circeExtrasVersion}"
     implementation 
"io.circe:circe-parser_${scalaVersion}:${circeGenericVersion}"

     // https://mvnrepository.com/artifact/org.scalamacros/paradise
     scalaCompilerPlugin 
"org.scalamacros:paradise_${scalaBuildVersion}:${pardiseVersion}"

     implementation "log4j:log4j:${log4jVersion}"
     implementation "org.slf4j:slf4j-log4j12:${slf4jVersion}"
     // Add test dependencies here.
     //testImplementation "junit:junit:4.12"
     testImplementation "org.scalatest:scalatest_${scalaVersion}:3.1.0"
     // https://mvnrepository.com/artifact/org.scalamock/scalamock
     testImplementation 
"org.scalamock:scalamock_${scalaVersion}:${scalaMockVersion}"

}

So all are with the same scala version. I cannot share the code, but the 
main app looks like:


val env = StreamExecutionEnvironment.getExecutionEnvironment
val stream  = env
         .addSource(KAFKA_STREAM) // this will get us a stream with our 
object model which is like this: case class A(a:Map[String, 
other_case_class_obj], b: List[other_case_class_obj], c: String)

.flatMap(CustomFlatMap())
.print

Thank you,
Alex

On Thu, Dec 19, 2019 at 3:14 PM Timo Walther > wrote:


That's sounds like a classloading or most likely dependency issue.

Are all dependencies including Flink use the same Scala version? Could
you maybe share reproducible some code with us?

Regards,
Timo


On 19.12.19 13:53, Alexandru Vasiu wrote:
 > I'm sorry for my last message, it might be incomplete.
 >
 > So I used case classed for my objects, but it doesn't work.
 >
 > Riching this error: "Exception in thread "main"
 >

org.apache.flink.shaded.guava18.com.google.common.util.concurrent.ExecutionError:

 > java.lang.NoClassDefFoundError: scala/math/Ordering$$anon$9" when
I'm
 > trying to apply the map/flatMap function over the stream (which
is from
 > a Kafka consumer).
 >
 >
 > Alex
 >
 > On Thu, Dec 19, 2019 at 2:24 PM Alexandru Vasiu
 > mailto:alexandru.ava...@gmail.com>
>> wrote:
 >
 >     I used `case class` for example case class A(a: Map[String,
String])
 >     so it should work
 >
 >     Alex
 >
 >     On Thu, Dec 19, 2019 at 2:18 PM Timo Walther
mailto:twal...@apache.org>
 >     >> wrote:
 >
 >         Hi Alex,
 >
 >         the problem is that `case class` classes are analyzed by
Scala
 >         specific
 >         code whereas `class` classes are analyzed with Java specific
 >         code. So I
 >         would recommend to use a case class to make sure you stay
in the
 >         "Scala
 >         world" otherwise the fallback is the Java-based
TypeExtractor.
 >
 >         For your custom Map, you can simply ignore this error
message.
 >         It will
 >         fallback to the Java-based TypeExtra

Re: POJO ERROR

2019-12-19 Thread Alexandru Vasiu
Nope, because scalaBuildVersion is the scala version including minor
version so in this case: 2.12.10 and we used it just where we need.
We used scalaVersion to specify for each library what scala is used, so
used flink will be flink-streaming-scala_2.12

Alex

On Thu, Dec 19, 2019 at 3:40 PM Timo Walther  wrote:

> I see a mismatch between scalaBuildVersion and scalaVersion could this
> be the issue?
>
> Regards,
> Timo
>
>
> On 19.12.19 14:33, Alexandru Vasiu wrote:
> > This is a part of my Gradle config:
> >
> > ext {
> >  scalaVersion = '2.12'
> >  flinkVersion = '1.9.1'
> >  scalaBuildVersion = "${scalaVersion}.10"
> >  scalaMockVersion = '4.4.0'
> >  circeGenericVersion = '0.12.3'
> >  circeExtrasVersion = '0.12.2'
> >  pardiseVersion = '2.1.1'
> >  slf4jVersion = '1.7.7'
> >  log4jVersion = '1.2.17'
> >  sourceDir = 'src/main/scala'
> >  testDir = 'src/test/scala'
> > }
> > repositories {
> >  mavenCentral()
> >  //maven { url
> > "https://repository.apache.org/content/repositories/snapshots/"; }
> > }
> > configurations {
> >  scalaCompilerPlugin
> > }
> > dependencies {
> >  implementation "org.scala-lang:scala-library:${scalaBuildVersion}"
> >  // --
> >  // Compile-time dependencies that should NOT be part of the
> >  // shadow jar and are provided in the lib folder of Flink
> >  // --
> >  //compile "org.apache.flink:flink-java:${flinkVersion}"
> >  implementation
> > "org.apache.flink:flink-streaming-scala_${scalaVersion}:${flinkVersion}"
> >  implementation
> > "org.apache.flink:flink-connector-kafka_${scalaVersion}:${flinkVersion}"
> >  // --
> >  // Dependencies that should be part of the shadow jar, e.g.
> >  // connectors. These must be in the flinkShadowJar configuration!
> >  // --
> >  //flinkShadowJar
> >
> "org.apache.flink:flink-connector-kafka-0.11_${scalaBinaryVersion}:${flinkVersion}"
> >  // https://mvnrepository.com/artifact/io.circe/
> >  implementation
> > "io.circe:circe-generic_${scalaVersion}:${circeGenericVersion}"
> >  implementation
> > "io.circe:circe-generic-extras_${scalaVersion}:${circeExtrasVersion}"
> >  implementation
> > "io.circe:circe-parser_${scalaVersion}:${circeGenericVersion}"
> >  // https://mvnrepository.com/artifact/org.scalamacros/paradise
> >  scalaCompilerPlugin
> > "org.scalamacros:paradise_${scalaBuildVersion}:${pardiseVersion}"
> >  implementation "log4j:log4j:${log4jVersion}"
> >  implementation "org.slf4j:slf4j-log4j12:${slf4jVersion}"
> >  // Add test dependencies here.
> >  //testImplementation "junit:junit:4.12"
> >  testImplementation "org.scalatest:scalatest_${scalaVersion}:3.1.0"
> >  // https://mvnrepository.com/artifact/org.scalamock/scalamock
> >  testImplementation
> > "org.scalamock:scalamock_${scalaVersion}:${scalaMockVersion}"
> > }
> >
> > So all are with the same scala version. I cannot share the code, but the
> > main app looks like:
> >
> > val env = StreamExecutionEnvironment.getExecutionEnvironment
> > val stream  = env
> >  .addSource(KAFKA_STREAM) // this will get us a stream with our
> > object model which is like this: case class A(a:Map[String,
> > other_case_class_obj], b: List[other_case_class_obj], c: String)
> > .flatMap(CustomFlatMap())
> > .print
> >
> > Thank you,
> > Alex
> >
> > On Thu, Dec 19, 2019 at 3:14 PM Timo Walther  > > wrote:
> >
> > That's sounds like a classloading or most likely dependency issue.
> >
> > Are all dependencies including Flink use the same Scala version?
> Could
> > you maybe share reproducible some code with us?
> >
> > Regards,
> > Timo
> >
> >
> > On 19.12.19 13:53, Alexandru Vasiu wrote:
> >  > I'm sorry for my last message, it might be incomplete.
> >  >
> >  > So I used case classed for my objects, but it doesn't work.
> >  >
> >  > Riching this error: "Exception in thread "main"
> >  >
> >
>  
> org.apache.flink.shaded.guava18.com.google.common.util.concurrent.ExecutionError:
> >
> >  > java.lang.NoClassDefFoundError: scala/math/Ordering$$anon$9" when
> > I'm
> >  > trying to apply the map/flatMap function over the stream (which
> > is from
> >  > a Kafka consumer).
> >  >
> >  >
> >  > Alex
> >  >
> >  > On Thu, Dec 19, 2019 at 2:24 PM Alexandru Vasiu
> >  > mailto:alexandru.ava...@gmail.com>
> >  > >> wrote:
> >  >
> >  > I used `case class` for example case class A(a: Map[String,
> > String])
> >  > so it should work
> >  >
> >  > Alex
> >  >
>

Re: Fw: Metrics based on data filtered from DataStreamSource

2019-12-19 Thread Robert Metzger
Hey Sidney,

for the .filter() function, you are passing a function without an open()
method. The function that getFilter() returns, has no open() method.
What could work is creating a

Handler extends AbstractRichFunction implements MapFunction, FilterFunction

and passing those instances into the filter() and map() operator.


On Tue, Dec 17, 2019 at 3:18 AM vino yang  wrote:

> Hi Sideny,
>
> >> I'd prefer not to consume messages I don't plan on actually handling.
>
> It depends on your design. If you produce different types into different
> partitions, then it's easy to filter different types from the Kafka
> consumer(only consume partial partition).
>
> If you do not distinguish different types in the partitions of the Kafka
> topic. You can filter messages based on type in Flink job.
>
> >> I MUST consume the messages, count those I want to filter out and then
> simply not handle them?
>
> I did not say "you MUST", I said "you can".
>
> Actually, there are serval solutions.
>
> e.g.
> 1) I described in the last mail;
> 2) filter in flink source;
> 3) filter via flink filter transform function
> 4) side output/split, selet
>
> Choosing one solution that suite your scene.
>
> The key thing in my last mail is to describe the problem of your
> reflection problem.
>
> Best,
> Vino
>
> Sidney Feiner  于2019年12月16日周一 下午9:31写道:
>
>> You are right with everything you say!
>> The solution you propose is actually what I'm trying to avoid. I'd prefer
>> not to consume messages I don't plan on actually handling.
>> But from what you say it sounds I have no other choice. Am I right? I
>> MUST consume the messages, count those I want to filter out and then simply
>> not handle them?
>> Which means I must filter them in the task itself and I have no way of
>> filtering them directly from the data source?
>>
>>
>> *Sidney Feiner* */* Data Platform Developer
>> M: +972.528197720 */* Skype: sidney.feiner.startapp
>>
>> [image: emailsignature]
>>
>> --
>> *From:* vino yang 
>> *Sent:* Monday, December 16, 2019 7:56 AM
>> *To:* Sidney Feiner 
>> *Cc:* user@flink.apache.org 
>> *Subject:* Re: Fw: Metrics based on data filtered from DataStreamSource
>>
>> Hi Sidney,
>>
>> Firstly, the `open` method of UDF's instance is always invoked when the
>> task thread starts to run.
>>
>> From the second code snippet image that you provided, I guess you are
>> trying to get a dynamic handler with reflection technology, is
>> that correct? I also guess that you want to get a dynamic instance of a
>> handler in the runtime, correct me if I am wrong.
>>
>> IMO, you may misunderstand the program you write and the runtime of Task,
>> the purpose of your program is used to build the job graph. The business
>> logic in UDF is used to describe the user's business logic.
>>
>> For your scene, if many types of events exist in one topic, you can
>> consume them and group by the type then count them?
>>
>> Best,
>> Vino
>>
>> Sidney Feiner  于2019年12月16日周一 上午12:54写道:
>>
>> Hey,
>> I have a question about using metrics based on filtered data.
>> Basically, I have handlers for many types of events I get from my data
>> source (in my case, Kafka), and every handler has it's own filter function.
>> That given handler also has a Counter, incrementing every time it filters
>> out an event (as part of the FilterFunction).
>>
>> Problem arrises when I use that FilterFunction on the DataSourceStream -
>> the handler's open() function hasn't been called and thus the metrics have
>> never been initiated.
>> Do I have a way of making this work? Or any other way of counting events
>> that have been filtered out from the DataStreamSource?
>>
>> Handler:
>>
>> public abstract class Handler extends RichMapFunction {
>> private transient Counter filteredCounter;
>> private boolean isInit = false;
>>
>> @Override
>> public void open(Configuration parameters) throws Exception {
>> if (!isInit) {
>> MetricGroup metricGroup = 
>> getRuntimeContext().getMetricGroup().addGroup(getClass().getSimpleName());
>> filteredCounter = 
>> metricGroup.counter(CustomMetricsManager.getFilteredSuffix());
>> isInit = true;
>> }
>> }
>>
>> public final FilterFunction getFilter() {
>> return (FilterFunction) event -> {
>> boolean res = filter(event);
>> if (!res) {
>> filteredCounter.inc();
>> }
>> return res;
>> };
>> }
>>
>> abstract protected boolean filter(Event event);
>> }
>>
>>
>> And when I init the DataStreamSource:
>>
>> Handler handler = (Handler) 
>> Class.forName(handlerName).getConstructor().newInstance();
>> dataStream = dataStreamSource.filter(handler.getFilter()).map(handler);
>>
>>
>> Any help would be much appreciated!
>>
>> Thanks 🙂
>>
>>
>>
>>


Re: How to convert retract stream to dynamic table?

2019-12-19 Thread David Anderson
The Elasticsearch, HBase, and JDBC[1] table sinks all support streaming
UPSERT mode[2]. While not exactly the same as RETRACT mode, it seems like
this might get the job done (unless I'm missing something, which is
entirely possible).

David

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/connect.html#jdbc-connector
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/connect.html#update-modes



On Thu, Dec 19, 2019 at 9:20 AM Dawid Wysakowicz 
wrote:

> Hi,
>
> Correct me if I am wrong James, but I think your original question was how
> do you create a Table out of a changelog (a stream with a change flag).
> Unfortunately I think it is not possible right now. This definitely is high
> on our priority list for the near future. There were first approaches[1] to
> implement that before, but we must clarify all aspects of such operation
> first.
>
> Best,
>
> Dawid
>
> [1] https://github.com/apache/flink/pull/6787
> On 19/12/2019 04:05, Kurt Young wrote:
>
> Hi James,
>
> If I understand correctly, you can use `TableEnvironment#sqlQuery` to
> achieve
> what you want. You can pass the whole sql statement in and get a `Table`
> back
> from the method. I believe this is the table you want which is
> semantically
> equivalent with the stream you mentioned.
>
> For example, you can further operate on the `Table` with other sql
> operations,
> like `GROUP BY cnt` on the returned table. You can think of it in this way
> that
> Flink would attach another aggregation operator to the original plan, and
> this
> operator can consume the retraction stream which the original sql
> statement
> produced and start to generate correct results.
>
> Best,
> Kurt
>
>
> On Thu, Dec 19, 2019 at 1:25 AM James Baker  wrote:
>
>> Hi!
>> I've been looking at Flink for the last few days and have very much
>> appreciated the concept of Dynamic Tables, it solves a lot of my needs and
>> handles a lot of the complex state tracking that is otherwise painful. I
>> have a question about the composability of the system which the docs don't
>> answer.
>>
>> The docs use the example of 'SELECT user, COUNT(url) as cnt FROM clicks
>> GROUP BY user', where clicks is a stream coming in of user and the url
>> they've clicked.
>>
>> From such a Table, I can then get a retract stream written into an
>> external system, perhaps outputting (true, User1, 1), ..., (true, User1, 2)
>> indicating that User1's clicked on something.
>>
>> Is there an idiomatic way to convert a retract stream into a semantically
>> equivalent table?
>>
>> Thanks,
>> James
>>
>


Re: POJO ERROR

2019-12-19 Thread Timo Walther
Sorry, you are right. Maybe you can also share the full stack trace 
because I don't know where this guava library should be used.


Regards,
Timo


On 19.12.19 14:50, Alexandru Vasiu wrote:
Nope, because scalaBuildVersion is the scala version including minor 
version so in this case: 2.12.10 and we used it just where we need.
We used scalaVersion to specify for each library what scala is used, so 
used flink will be flink-streaming-scala_2.12


Alex

On Thu, Dec 19, 2019 at 3:40 PM Timo Walther > wrote:


I see a mismatch between scalaBuildVersion and scalaVersion could this
be the issue?

Regards,
Timo


On 19.12.19 14:33, Alexandru Vasiu wrote:
 > This is a part of my Gradle config:
 >
 > ext {
 >      scalaVersion = '2.12'
 >      flinkVersion = '1.9.1'
 >      scalaBuildVersion = "${scalaVersion}.10"
 >      scalaMockVersion = '4.4.0'
 >      circeGenericVersion = '0.12.3'
 >      circeExtrasVersion = '0.12.2'
 >      pardiseVersion = '2.1.1'
 >      slf4jVersion = '1.7.7'
 >      log4jVersion = '1.2.17'
 >      sourceDir = 'src/main/scala'
 >      testDir = 'src/test/scala'
 > }
 > repositories {
 >      mavenCentral()
 >      //maven { url
 > "https://repository.apache.org/content/repositories/snapshots/"; }
 > }
 > configurations {
 >      scalaCompilerPlugin
 > }
 > dependencies {
 >      implementation
"org.scala-lang:scala-library:${scalaBuildVersion}"
 >      //
--
 >      // Compile-time dependencies that should NOT be part of the
 >      // shadow jar and are provided in the lib folder of Flink
 >      //
--
 >      //compile "org.apache.flink:flink-java:${flinkVersion}"
 >      implementation
 >
"org.apache.flink:flink-streaming-scala_${scalaVersion}:${flinkVersion}"
 >      implementation
 >
"org.apache.flink:flink-connector-kafka_${scalaVersion}:${flinkVersion}"
 >      //
--
 >      // Dependencies that should be part of the shadow jar, e.g.
 >      // connectors. These must be in the flinkShadowJar
configuration!
 >      //
--
 >      //flinkShadowJar
 >

"org.apache.flink:flink-connector-kafka-0.11_${scalaBinaryVersion}:${flinkVersion}"
 >      // https://mvnrepository.com/artifact/io.circe/
 >      implementation
 > "io.circe:circe-generic_${scalaVersion}:${circeGenericVersion}"
 >      implementation
 > "io.circe:circe-generic-extras_${scalaVersion}:${circeExtrasVersion}"
 >      implementation
 > "io.circe:circe-parser_${scalaVersion}:${circeGenericVersion}"
 >      // https://mvnrepository.com/artifact/org.scalamacros/paradise
 >      scalaCompilerPlugin
 > "org.scalamacros:paradise_${scalaBuildVersion}:${pardiseVersion}"
 >      implementation "log4j:log4j:${log4jVersion}"
 >      implementation "org.slf4j:slf4j-log4j12:${slf4jVersion}"
 >      // Add test dependencies here.
 >      //testImplementation "junit:junit:4.12"
 >      testImplementation
"org.scalatest:scalatest_${scalaVersion}:3.1.0"
 >      // https://mvnrepository.com/artifact/org.scalamock/scalamock
 >      testImplementation
 > "org.scalamock:scalamock_${scalaVersion}:${scalaMockVersion}"
 > }
 >
 > So all are with the same scala version. I cannot share the code,
but the
 > main app looks like:
 >
 > val env = StreamExecutionEnvironment.getExecutionEnvironment
 > val stream  = env
 >          .addSource(KAFKA_STREAM) // this will get us a stream
with our
 > object model which is like this: case class A(a:Map[String,
 > other_case_class_obj], b: List[other_case_class_obj], c: String)
 > .flatMap(CustomFlatMap())
 > .print
 >
 > Thank you,
 > Alex
 >
 > On Thu, Dec 19, 2019 at 3:14 PM Timo Walther mailto:twal...@apache.org>
 > >> wrote:
 >
 >     That's sounds like a classloading or most likely dependency
issue.
 >
 >     Are all dependencies including Flink use the same Scala
version? Could
 >     you maybe share reproducible some code with us?
 >
 >     Regards,
 >     Timo
 >
 >
 >     On 19.12.19 13:53, Alexandru Vasiu wrote:
 >      > I'm sorry for my last message, it might be incomplete.
 >      >
 >      > So I used case classed for my objects, but it doesn't work.
 >      >
 >      > Riching this error: "Exception in thread "main"
 >      >
 >   
  org.apache.flink.shaded.guava18.com.google.common.util.concurrent.ExecutionError:

 >
 >     

Deprecated SplitStream class - what should be use instead.

2019-12-19 Thread KristoffSC
Hi,
I've noticed that SplitStream class is marked as deprecated, although split
method of DataStream is not.
Also there is no alternative proposed in SplitStream doc for it.

In my use case I will have a stream of events that I have to split into two
separate streams based on some function. Events with field that meets some
condition should go to the first stream, where all other should go to the
different stream.

Later both streams should be processed in a different manner. 

I was planing to use approach presented here:
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/

SplitStream split = someDataStream.split(new
OutputSelector() {
@Override
public Iterable select(Integer value) {
List output = new ArrayList();
if (value % 2 == 0) {
output.add("even");
}
else {
output.add("odd");
}
return output;
}
});

But it turns out that SplitStream is deprecated. 
Also I've found similar question on SO
https://stackoverflow.com/questions/53588554/apache-flink-using-filter-or-split-to-split-a-stream
  

I don't fink filter and SideOutputs are good choice here.

I will be thankful for an any suggestion.




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Querying DataStream for events before a given time

2019-12-19 Thread Cindy McMullen
Hi -

I’m streaming events from Kafka, processing in EventTime.  I’d like to process 
only events that are older (before) some given time (say, 2 days ago) at an 
interval of 5 minutes.  I’ve experimented with Flink DynamicTables:

String query = "SELECT pid, status, lastTry, TUMBLE_END(UserActionTime, 
INTERVAL ‘5' MINUTE) as endT " +
" FROM " + rawTable +
" WHERE status=‘RETRY'" +
" GROUP BY status, pid, lastTry, TUMBLE(UserActionTime,  INTERVAL ‘5' 
MINUTE)";


But this ignores events that are older than 5 minutes.  Here’s my timestamp 
assigner:
public class TimeLagWatermarkAssigner implements 
AssignerWithPeriodicWatermarks {

  private final long maxTimeLag = 2000; // 2 seconds

  @Override
  public long extractTimestamp(RedactionResult rr, long 
previousElementTimestamp) {
return rr.getLastTry().getTime();
  }

  @Override
  public Watermark getCurrentWatermark() {
return new Watermark(System.currentTimeMillis() - maxTimeLag);
  }
}
So, a couple of questions:

1. How can I get this query to recognize earlier events (before 5 minutes ago)?
2. Is using Dynamic Table a good solution, or could I accomplish the same thing 
using DataStream windowing?

Thanks -

— Cindy

Re: Need guidance on a use case

2019-12-19 Thread Timo Walther

Hi Eva,

I'm not 100% sure if your use case can be solved with SQL. JOIN in SQL 
always joins an incoming record with all previous arrived records. Maybe 
Jark in CC has some idea?


It might make sense to use the DataStream API instead with a connect() 
and CoProcessFunction where you can simply put the latest row into state 
and perform the joining and emission of a new row when required.


Regards,
Timo


On 18.12.19 23:44, Eva Eva wrote:

Hi Team,

I'm trying Flink for the first time and encountered an issue that I 
would like to discuss and understand if there is a way to achieve my use 
case with Flink.


*Use case:* I need to perform unbounded stream joins on multiple data 
streams by listening to different Kafka topics. I have a scenario to 
join a column in a table with multiple columns in another table by 
avoiding duplicate joins. The main concern is that I'm not able to avoid 
duplicate joins.


*Issue: *Given the nature of data, it is possible to have updates over 
time, sent as new messages since Kafka is immutable. For a given key I 
would like to perform join only on the latest message, whereas currently 
Flink performs join against all messages with the key (this is what I'm 
calling as duplicate joins issue).
Example: Say I have two Kafka streams "User" and "Task". And I want to 
join "User" with multiple columns in "Task".
Join "UserID" in "User" with "PrimaryAssignee", "SecondaryAssignee" and 
"Manager" in "Task".


Assuming I created and registered DataStreams.
Below is my query:

   SELECT * FROM Task t
    LEFT JOIN User ua ON t.PrimaryAssignee = ua.UserID
    LEFT JOIN User ub ON t.SecondaryAssignee = ub.UserID
    LEFT JOIN User uc ON t.Manager = uc.UserID

Say I have 5 different messages in Kafka with UserID=1000, I don't want 
to perform 5 joins instead I want to perform join with the only latest 
message with UserID=1000. Is there any way to achieve this without using 
Temporal Table Functions?


*I cannot use Temporal Table Functions because of below reasons:*
1. I need to trigger JOIN operation for every new message in Kafka. 
Whereas new messages in Temporal Table don't trigger JOIN operation.
2. I need to perform LEFT OUTER JOINS, whereas Temporal Table can only 
be used for INNER JOINS
3. From what I understand, JOIN in Temporal Table can only be performed 
using Primary key, so I won't be able to Join more than one key.



Could someone please help me with this? Please let me know if any of the 
information is not clear or need more details.


  If this is not the correct email id, could you please point me to the 
correct one.



Thanks in advance!




Re: [DISCUSS] What parts of the Python API should we focus on next ?

2019-12-19 Thread Bowen Li
- integrate PyFlink with Jupyter notebook
   - Description: users should be able to run PyFlink seamlessly in Jupyter
   - Benefits: Jupyter is the industrial standard notebook for data
scientists. I’ve talked to a few companies in North America, they think
Jupyter is the #1 way to empower internal DS with Flink


On Wed, Dec 18, 2019 at 19:05 jincheng sun  wrote:

> Also CC user-zh.
>
> Best,
> Jincheng
>
>
> jincheng sun  于2019年12月19日周四 上午10:20写道:
>
>> Hi folks,
>>
>> As release-1.10 is under feature-freeze(The stateless Python UDF is
>> already supported), it is time for us to plan the features of PyFlink for
>> the next release.
>>
>> To make sure the features supported in PyFlink are the mostly demanded
>> for the community, we'd like to get more people involved, i.e., it would be
>> better if all of the devs and users join in the discussion of which kind of
>> features are more important and urgent.
>>
>> We have already listed some features from different aspects which you can
>> find below, however it is not the ultimate plan. We appreciate any
>> suggestions from the community, either on the functionalities or
>> performance improvements, etc. Would be great to have the following
>> information if you want to suggest to add some features:
>>
>> -
>> - Feature description: 
>> - Benefits of the feature: 
>> - Use cases (optional): 
>> --
>>
>> Features in my mind
>>
>> 1. Integration with most popular Python libraries
>> - fromPandas/toPandas API
>>Description:
>>   Support to convert between Table and pandas.DataFrame.
>>Benefits:
>>   Users could switch between Flink and Pandas API, for example,
>> do some analysis using Flink and then perform analysis using the Pandas API
>> if the result data is small and could fit into the memory, and vice versa.
>>
>> - Support Scalar Pandas UDF
>>Description:
>>   Support scalar Pandas UDF in Python Table API & SQL. Both the
>> input and output of the UDF is pandas.Series.
>>Benefits:
>>   1) Scalar Pandas UDF performs better than row-at-a-time UDF,
>> ranging from 3x to over 100x (from pyspark)
>>   2) Users could use Pandas/Numpy API in the Python UDF
>> implementation if the input/output data type is pandas.Series
>>
>> - Support Pandas UDAF in batch GroupBy aggregation
>>Description:
>>Support Pandas UDAF in batch GroupBy aggregation of Python
>> Table API & SQL. Both the input and output of the UDF is pandas.DataFrame.
>>Benefits:
>>   1) Pandas UDAF performs better than row-at-a-time UDAF more
>> than 10x in certain scenarios
>>   2) Users could use Pandas/Numpy API in the Python UDAF
>> implementation if the input/output data type is pandas.DataFrame
>>
>> 2. Fully support  all kinds of Python UDF
>> - Support Python UDAF(stateful) in GroupBy aggregation (NOTE: Please
>> give us some use case if you want this feature to be contained in the next
>> release)
>>   Description:
>> Support UDAF in GroupBy aggregation.
>>   Benefits:
>> Users could define and use Python UDAF and use it in GroupBy
>> aggregation. Without it, users have to use Java/Scala UDAF.
>>
>> - Support Python UDTF
>>   Description:
>>Support  Python UDTF in Python Table API & SQL
>>   Benefits:
>> Users could define and use Python UDTF in Python Table API & SQL.
>> Without it, users have to use Java/Scala UDTF.
>>
>> 3. Debugging and Monitoring of Python UDF
>>- Support User-Defined Metrics
>>  Description:
>>Allow users to define user-defined metrics and global job
>> parameters with Python UDFs.
>>  Benefits:
>>UDF needs metrics to monitor some business or technical
>> indicators, which is also a requirement for UDFs.
>>
>>- Make the log level configurable
>>  Description:
>>Allow users to config the log level of Python UDF.
>>  Benefits:
>>Users could configure different log levels when debugging and
>> deploying.
>>
>> 4. Enrich the Python execution environment
>>- Docker Mode Support
>>  Description:
>>  Support running python UDF in docker workers.
>>  Benefits:
>>  Support various of deployments to meet more users' requirements.
>>
>> 5. Expand the usage scope of Python UDF
>>- Support to use Python UDF via SQL client
>>  Description:
>>  Support to register and use Python UDF via SQL client
>>  Benefits:
>>  SQL client is a very important interface for SQL users. This
>> feature allows SQL users to use Python UDFs via SQL client.
>>
>>- Integrate Python UDF with Notebooks
>>  Description:
>>  Such as Zeppelin, etc (Especially Python dependencies)
>>
>>- Support to register Python UDF into catalog
>>   Description:
>>   Support to register Python UDF into catalog
>>   Benefits:
>>   1)Catalog is the centralized pl

RE: Keyed stream, parallelism, load balancing and ensuring that the same key go to the same Task Manager and task slot

2019-12-19 Thread Theo Diefenthal
Hi Krzysztof,

You can just key your stream by transaction id. If you have lots of
different transaction ids, you can expect the load to be evenly
distributed. All events with the same key (==transaction id) will be
processed by the same task slot.

If you only have a few kafka partitions, you could key by transaction id
as early as possible in order to fully utilize your cluster. Remember,
however, that each keyby will cause a network shuffle, so it's probably
not worth it to fist key by transaction id, then by traded, and afterwards
again by transaction id. 

Best regards
Theo

-Original Message-
From: KristoffSC  
Sent: Dienstag, 17. Dezember 2019 23:35
To: user@flink.apache.org
Subject: Keyed stream, parallelism, load balancing and ensuring that the
same key go to the same Task Manager and task slot

Hi community,
I'm trying to build a PoC pipeline for my project and I have few questions
regarding load balancing between task managers and ensuring that keyed
stream events for the same key will go to the same Task Manager (hence the
same task slot).

Lets assume that we have 3 task managers, 3 task slot each. So it gives us
9 task slots in total.
The source is a Kafka topic with N partitions. Events are "linked" with
each other by transactionId (long) field. So they can be keyed by this
field.
Events for particular transactionId can be spanned across many partitions
(we don't have control over this).

The pipeline is:
1. Kafka Source -> produces RawEvents (map operator).
2. Enrichment with AsuncFuntion(simple DB/cache call) produces
EnrichedEvents with map operator.
3. Key EnrichedEvents by tradeId, buffer events for some time, sort them
by sequenceNumber (Window aggregation) and emit a new event based on
those. 
N sorted EnrichedEvents produces one TransactionEvent for this
transactionId.
4. Sink TransactionEvents

Requirements:
1. Have high task slot utilization (Low number of idle/un-addressed task
slots).
2. EnrichedEvents for the same transactionId should go to the same
TaskSlot (hence the same TaskManager).

Question:
How this can be achieved?
How parallelism value for each operator should be set?

Note:
Probably I can already key the original RawEvents on transactionId.

Thanks,
Krzysztof



--
Sent from:
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Deprecated SplitStream class - what should be use instead.

2019-12-19 Thread Kostas Kloudas
Hi Kristoff,

The recommended alternative is to use SideOutputs as described in [1].
Could you elaborate why you think side outputs are not a good choice
for your usecase?

Cheers,
Kostas

[1] 
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/side_output.html

On Thu, Dec 19, 2019 at 5:13 PM KristoffSC
 wrote:
>
> Hi,
> I've noticed that SplitStream class is marked as deprecated, although split
> method of DataStream is not.
> Also there is no alternative proposed in SplitStream doc for it.
>
> In my use case I will have a stream of events that I have to split into two
> separate streams based on some function. Events with field that meets some
> condition should go to the first stream, where all other should go to the
> different stream.
>
> Later both streams should be processed in a different manner.
>
> I was planing to use approach presented here:
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/
>
> SplitStream split = someDataStream.split(new
> OutputSelector() {
> @Override
> public Iterable select(Integer value) {
> List output = new ArrayList();
> if (value % 2 == 0) {
> output.add("even");
> }
> else {
> output.add("odd");
> }
> return output;
> }
> });
>
> But it turns out that SplitStream is deprecated.
> Also I've found similar question on SO
> https://stackoverflow.com/questions/53588554/apache-flink-using-filter-or-split-to-split-a-stream
>
> I don't fink filter and SideOutputs are good choice here.
>
> I will be thankful for an any suggestion.
>
>
>
>
> --
> Sent from: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Querying DataStream for events before a given time

2019-12-19 Thread Cindy McMullen
This is close:

String query = "SELECT pid, status, lastTry " +
" FROM " + rawTable +
" WHERE status='RECOVERABLE'" +
" GROUP BY HOP(UserActionTime,  INTERVAL '30' SECOND, INTERVAL '5' HOUR), 
pid, status, lastTry";
But I need to have a stream/table that will dynamically update every 30 seconds 
with only events that were not in the last query.


On 2019/12/19 16:21:28, Cindy McMullen  wrote: 
> Hi -> 
> 
> I’m streaming events from Kafka, processing in EventTime.  I’d like to 
> process only events that are older (before) some given time (say, 2 days ago) 
> at an interval of 5 minutes.  I’ve experimented with Flink DynamicTables:> 
> 
> String query = "SELECT pid, status, lastTry, TUMBLE_END(UserActionTime, 
> INTERVAL ‘5' MINUTE) as endT " +> 
> " FROM " + rawTable +> 
> " WHERE status=‘RETRY'" +> 
> " GROUP BY status, pid, lastTry, TUMBLE(UserActionTime,  INTERVAL ‘5' 
> MINUTE)";> 
> 
> 
> But this ignores events that are older than 5 minutes.  Here’s my timestamp 
> assigner:> 
> public class TimeLagWatermarkAssigner implements 
> AssignerWithPeriodicWatermarks {> 
> 
>   private final long maxTimeLag = 2000; // 2 seconds> 
> 
>   @Override> 
>   public long extractTimestamp(RedactionResult rr, long 
> previousElementTimestamp) {> 
> return rr.getLastTry().getTime();> 
>   }> 
> 
>   @Override> 
>   public Watermark getCurrentWatermark() {> 
> return new Watermark(System.currentTimeMillis() - maxTimeLag);> 
>   }> 
> }> 
> So, a couple of questions:> 
> 
> 1. How can I get this query to recognize earlier events (before 5 minutes 
> ago)?> 
> 2. Is using Dynamic Table a good solution, or could I accomplish the same 
> thing using DataStream windowing?> 
> 
> Thanks -> 
> 
> — Cindy> 

Re: Deprecated SplitStream class - what should be use instead.

2019-12-19 Thread KristoffSC
Kostas, thank you for your response,

Well although the Side Outputs would do the job, I was just surprised that
those are the replacements for stream splitting.

The thing is, and this is might be only a subjective opinion, it that I
would assume that Side Outputs should be used only to produce something
aside of the main processing function like control messages or some
leftovers.

In my case, I wanted to simply split the stream into two new streams based
on some condition.
With side outputs I will have to "treat" the second stream as a something
additional to the main processing result.

Like it is written in the docs: 
"*In addition* to the main stream that results from DataStream
operations(...)"

or
"The type of data in the result streams does not have to match the type of
data in the *main *stream and the types of the different side outputs can
also differ. "


I'm my case I don't have any "addition" to my main stream and actually both
spitted streams are equally important :)

So by writing that side outputs are not good for my use case I meant that
they are not fitting conceptually, at least in my opinion.

Regards,
Krzysztof




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Triggering temporal queries

2019-12-19 Thread Cindy McMullen
This code runs and returns the correct result on the initial query, but fails 
to trigger as data continues to stream in on Kafka.  Is there anything obvious 
I’m missing?

env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
tableEnv = StreamTableEnvironment.create(env);

// Consume RedactionResults from Kafka into DataStream
DataStream rrStream =
env.addSource(kafkaConsumer, "Kafka source for topic: " + getTopic());
 Table rawTable = tableEnv.fromDataStream(rrStream, "lastTry, pid, tid, status, 
UserActionTime.proctime");
rawTable.printSchema();

// This works on initial query, but fails to trigger afterwards.
String query = "SELECT UserActionTime, lastTry, LOCALTIMESTAMP from " + 
rawTable +
" WHERE (TIMESTAMPDIFF(MINUTE, lastTry, LOCALTIMESTAMP) > 1)";
logger.debug("Query: " + query);

Table qTable = tableEnv.sqlQuery(query);



Unit testing filter function in flink

2019-12-19 Thread Vishwas Siravara
Hi guys,
I want to test a function like :

private[flink] def filterStream(dataStream:
DataStream[GenericRecord]): DataStream[GenericRecord] = {
  dataStream.filter(new FilterFunction[GenericRecord] {
override def filter(value: GenericRecord): Boolean = {
  if (value == null || value.get(StipFields.requestMessageType) == null) {
return false;
  } else {

ExecutionEnv.messageTypeList.contains(value.get(StipFields.requestMessageType)
  .toString) &&
ExecutionEnv.pcrList.contains(value.get(StipFields.pcrList).toString)
&& (value.get(StipFields
  .rejectCode).asInstanceOf[Int] == 0) &&
!(value.get(StipFields.processingCode).toString.equals("33"))
  }
}
  })
}

How can I do this ?

Best,
Vishwas


Re: Triggering temporal queries

2019-12-19 Thread Cindy McMullen
Never mind.  The code is correct; the input test data was not. All is well.

 FWIW, it’s useful while debugging to select the results of the time function 
itself:

String query = "SELECT lastTry, LOCALTIMESTAMP, TIMESTAMPDIFF(MINUTE, lastTry, 
LOCALTIMESTAMP) from " + rawTable +
" WHERE (TIMESTAMPDIFF(MINUTE, lastTry, LOCALTIMESTAMP) > 30)";
logger.debug("Query: " + query);

Table qTable = tableEnv.sqlQuery(query);

TupleTypeInfo> typeInfoTs =
new TupleTypeInfo<>( Types.SQL_TIMESTAMP(), Types.SQL_TIMESTAMP(), 
Types.INT());
tableEnv.toAppendStream(qTable, typeInfoTs)
.process(new ProcessFunction, 
Tuple3>() {
  @Override
  public void processElement(Tuple3 t, 
Context context,
 Collector> collector) throws Exception {
logger.debug("QR: " + t);
collector.collect(t);
  }
})
.addSink(new DiscardingSink<>());
env.execute();
19/12/19 17:50:37 DEBUG manual.KafkaRRConsumerTest: QR: (2019-12-19 
16:32:40.58,2019-12-19 17:50:37.955,77)
19/12/19 17:50:46 DEBUG manual.KafkaRRConsumerTest: QR: (2019-12-19 
16:42:40.58,2019-12-19 17:50:46.955,68)
19/12/19 17:50:55 DEBUG manual.KafkaRRConsumerTest: QR: (2019-12-19 
16:52:40.58,2019-12-19 17:50:55.958,58)


On 2019/12/19 21:41:17, Cindy McMullen  wrote: 
> This code runs and returns the correct result on the initial query, but fails 
> to trigger as data continues to stream in on Kafka.  Is there anything 
> obvious I’m missing?> 
> 
> env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);> 
> tableEnv = StreamTableEnvironment.create(env);> 
> 
> // Consume RedactionResults from Kafka into DataStream> 
> DataStream rrStream => 
> env.addSource(kafkaConsumer, "Kafka source for topic: " + getTopic());> 
>  Table rawTable = tableEnv.fromDataStream(rrStream, "lastTry, pid, tid, 
> status, UserActionTime.proctime");> 
> rawTable.printSchema();> 
> 
> // This works on initial query, but fails to trigger afterwards.> 
> String query = "SELECT UserActionTime, lastTry, LOCALTIMESTAMP from " + 
> rawTable +> 
> " WHERE (TIMESTAMPDIFF(MINUTE, lastTry, LOCALTIMESTAMP) > 1)";> 
> logger.debug("Query: " + query);> 
> 
> Table qTable = tableEnv.sqlQuery(query);> 
> 
> 

Re: Querying DataStream for events before a given time

2019-12-19 Thread Cindy McMullen
Never mind.  Flink docs state that the query is an append, not an update, so 
the query is working as expected.
https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/streaming/dynamic_tables.html#continuous-queries
 


A better solution is something along the lines of this:
String query = "SELECT lastTry, LOCALTIMESTAMP, TIMESTAMPDIFF(MINUTE, lastTry, 
LOCALTIMESTAMP) from " + rawTable +
" WHERE (TIMESTAMPDIFF(MINUTE, lastTry, LOCALTIMESTAMP) > 30)";
which can be modified to select on desired fields.

On 2019/12/19 16:21:28, Cindy McMullen  wrote: 
> Hi -> 
> 
> I’m streaming events from Kafka, processing in EventTime.  I’d like to 
> process only events that are older (before) some given time (say, 2 days ago) 
> at an interval of 5 minutes.  I’ve experimented with Flink DynamicTables:> 
> 
> String query = "SELECT pid, status, lastTry, TUMBLE_END(UserActionTime, 
> INTERVAL ‘5' MINUTE) as endT " +> 
> " FROM " + rawTable +> 
> " WHERE status=‘RETRY'" +> 
> " GROUP BY status, pid, lastTry, TUMBLE(UserActionTime,  INTERVAL ‘5' 
> MINUTE)";> 
> 
> 
> But this ignores events that are older than 5 minutes.  Here’s my timestamp 
> assigner:> 
> public class TimeLagWatermarkAssigner implements 
> AssignerWithPeriodicWatermarks {> 
> 
>   private final long maxTimeLag = 2000; // 2 seconds> 
> 
>   @Override> 
>   public long extractTimestamp(RedactionResult rr, long 
> previousElementTimestamp) {> 
> return rr.getLastTry().getTime();> 
>   }> 
> 
>   @Override> 
>   public Watermark getCurrentWatermark() {> 
> return new Watermark(System.currentTimeMillis() - maxTimeLag);> 
>   }> 
> }> 
> So, a couple of questions:> 
> 
> 1. How can I get this query to recognize earlier events (before 5 minutes 
> ago)?> 
> 2. Is using Dynamic Table a good solution, or could I accomplish the same 
> thing using DataStream windowing?> 
> 
> Thanks -> 
> 
> — Cindy> 

Re: Need guidance on a use case

2019-12-19 Thread Kurt Young
Hi Eva,

Correct me If i'm wrong. You have an unbounded Task stream and you
want to enrich the User info to the task event. Meanwhile, the User table
is also changing by the time, so you basically want that when task event
comes, join the latest data of User table and emit the results. Even if the
User table changes again, you don't want to re-trigger the join operation
which happened before and already emitted, right?

Best,
Kurt


On Fri, Dec 20, 2019 at 12:33 AM Timo Walther  wrote:

> Hi Eva,
>
> I'm not 100% sure if your use case can be solved with SQL. JOIN in SQL
> always joins an incoming record with all previous arrived records. Maybe
> Jark in CC has some idea?
>
> It might make sense to use the DataStream API instead with a connect()
> and CoProcessFunction where you can simply put the latest row into state
> and perform the joining and emission of a new row when required.
>
> Regards,
> Timo
>
>
> On 18.12.19 23:44, Eva Eva wrote:
> > Hi Team,
> >
> > I'm trying Flink for the first time and encountered an issue that I
> > would like to discuss and understand if there is a way to achieve my use
> > case with Flink.
> >
> > *Use case:* I need to perform unbounded stream joins on multiple data
> > streams by listening to different Kafka topics. I have a scenario to
> > join a column in a table with multiple columns in another table by
> > avoiding duplicate joins. The main concern is that I'm not able to avoid
> > duplicate joins.
> >
> > *Issue: *Given the nature of data, it is possible to have updates over
> > time, sent as new messages since Kafka is immutable. For a given key I
> > would like to perform join only on the latest message, whereas currently
> > Flink performs join against all messages with the key (this is what I'm
> > calling as duplicate joins issue).
> > Example: Say I have two Kafka streams "User" and "Task". And I want to
> > join "User" with multiple columns in "Task".
> > Join "UserID" in "User" with "PrimaryAssignee", "SecondaryAssignee" and
> > "Manager" in "Task".
> >
> > Assuming I created and registered DataStreams.
> > Below is my query:
> >
> >SELECT * FROM Task t
> > LEFT JOIN User ua ON t.PrimaryAssignee = ua.UserID
> > LEFT JOIN User ub ON t.SecondaryAssignee = ub.UserID
> > LEFT JOIN User uc ON t.Manager = uc.UserID
> >
> > Say I have 5 different messages in Kafka with UserID=1000, I don't want
> > to perform 5 joins instead I want to perform join with the only latest
> > message with UserID=1000. Is there any way to achieve this without using
> > Temporal Table Functions?
> >
> > *I cannot use Temporal Table Functions because of below reasons:*
> > 1. I need to trigger JOIN operation for every new message in Kafka.
> > Whereas new messages in Temporal Table don't trigger JOIN operation.
> > 2. I need to perform LEFT OUTER JOINS, whereas Temporal Table can only
> > be used for INNER JOINS
> > 3. From what I understand, JOIN in Temporal Table can only be performed
> > using Primary key, so I won't be able to Join more than one key.
> >
> >
> > Could someone please help me with this? Please let me know if any of the
> > information is not clear or need more details.
> >
> >   If this is not the correct email id, could you please point me to the
> > correct one.
> >
> >
> > Thanks in advance!
>
>


Change Akka Ask Timeout for Job Submission Only

2019-12-19 Thread Abdul Qadeer
Hi!

I am using Flink 1.8.3 and facing an issue where job submission through
RestClusterClient times out on Akka (default value 10s). In previous Flink
versions there was an option to set a different timeout value just for the
submission client (ClusterClient config), but looks like it is not honored
now as job submission from client is no more through Akka and it will use
the same value present with Dispatcher. I wanted to know how to increase
this timeout just for job submission without affecting other akka threads
in TaskManager/JobManager, or any other solution for the problem.

The relevant stack trace is pasted below:

"cause":{"commonElementCount":8,"localizedMessage":"Could not submit job
(JobID: 26940c17ae3130fb8be1323cce1036e4)","message":"Could not submit job
(JobID:
26940c17ae3130fb8be1323cce1036e4)","name":"org.apache.flink.client.program.ProgramInvocationException","cause":{"commonElementCount":3,"localizedMessage":"Failed
to submit JobGraph.","message":"Failed to submit
JobGraph.","name":"org.apache.flink.runtime.client.JobSubmissionException","cause":{"commonElementCount":3,"localizedMessage":"[Internal
server error., ]","message":"[Internal server error., ]","name":"org.apache.flink.runtime.rest.util.RestClientException","extendedStackTrace":[{"class":"org.apache.flink.runtime.rest.RestClient","method":"parseResponse","file":"RestClient.java","line":389,"exact":false,"location":"flink-runtime_2.11-1.8.2.jar","version":"1.8.2"},{"class":"org.apache.flink.runtime.rest.RestClient","method":"lambda$submitRequest$3","file":"RestClient.java","line":373,"exact":false,"location":"flink-runtime_2.11-1.8.2.jar","version":"1.8.2"}


Re: Need guidance on a use case

2019-12-19 Thread Jark Wu
Hi Eva,

If I understand correctly,
1) the user stream is a changelog stream which every record is a upsert
with a primary key, and you only want to join the latest one
2) if the user record is updated, you want to re-trigger the join
(retract&update previous joined result)

If this is your requirement, fortunately, this use case can be solved in
Flink SQL v1.9 with *blink planner*.
First, you can use Deduplicate[1] to convert the append stream to an
updating stream which keeps the last row.
And then, join Task stream with the deduplicated view. Below is the example:


Register the following query as "LatestUser" view:

SELECT *
FROM (
  SELECT *, ROW_NUMBER() OVER (PARTITION BY UserID ORDER BY PROCTIME()
DESC) AS rn
  FROM User
) WHERE rn = 1

Join on the "LatestUser":

 SELECT * FROM Task t
   LEFT JOIN LatestUser ua ON t.PrimaryAssignee = ua.UserID
   LEFT JOIN LatestUser ub ON t.SecondaryAssignee = ub.UserID
   LEFT JOIN LatestUser uc ON t.Manager = uc.UserID


Best,
Jark

[1]:
https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/sql.html#deduplication


On Fri, 20 Dec 2019 at 09:53, Kurt Young  wrote:

> Hi Eva,
>
> Correct me If i'm wrong. You have an unbounded Task stream and you
> want to enrich the User info to the task event. Meanwhile, the User table
> is also changing by the time, so you basically want that when task event
> comes, join the latest data of User table and emit the results. Even if
> the
> User table changes again, you don't want to re-trigger the join operation
> which happened before and already emitted, right?
>
> Best,
> Kurt
>
>
> On Fri, Dec 20, 2019 at 12:33 AM Timo Walther  wrote:
>
>> Hi Eva,
>>
>> I'm not 100% sure if your use case can be solved with SQL. JOIN in SQL
>> always joins an incoming record with all previous arrived records. Maybe
>> Jark in CC has some idea?
>>
>> It might make sense to use the DataStream API instead with a connect()
>> and CoProcessFunction where you can simply put the latest row into state
>> and perform the joining and emission of a new row when required.
>>
>> Regards,
>> Timo
>>
>>
>> On 18.12.19 23:44, Eva Eva wrote:
>> > Hi Team,
>> >
>> > I'm trying Flink for the first time and encountered an issue that I
>> > would like to discuss and understand if there is a way to achieve my
>> use
>> > case with Flink.
>> >
>> > *Use case:* I need to perform unbounded stream joins on multiple data
>> > streams by listening to different Kafka topics. I have a scenario to
>> > join a column in a table with multiple columns in another table by
>> > avoiding duplicate joins. The main concern is that I'm not able to
>> avoid
>> > duplicate joins.
>> >
>> > *Issue: *Given the nature of data, it is possible to have updates over
>> > time, sent as new messages since Kafka is immutable. For a given key I
>> > would like to perform join only on the latest message, whereas
>> currently
>> > Flink performs join against all messages with the key (this is what I'm
>> > calling as duplicate joins issue).
>> > Example: Say I have two Kafka streams "User" and "Task". And I want to
>> > join "User" with multiple columns in "Task".
>> > Join "UserID" in "User" with "PrimaryAssignee", "SecondaryAssignee" and
>> > "Manager" in "Task".
>> >
>> > Assuming I created and registered DataStreams.
>> > Below is my query:
>> >
>> >SELECT * FROM Task t
>> > LEFT JOIN User ua ON t.PrimaryAssignee = ua.UserID
>> > LEFT JOIN User ub ON t.SecondaryAssignee = ub.UserID
>> > LEFT JOIN User uc ON t.Manager = uc.UserID
>> >
>> > Say I have 5 different messages in Kafka with UserID=1000, I don't want
>> > to perform 5 joins instead I want to perform join with the only latest
>> > message with UserID=1000. Is there any way to achieve this without
>> using
>> > Temporal Table Functions?
>> >
>> > *I cannot use Temporal Table Functions because of below reasons:*
>> > 1. I need to trigger JOIN operation for every new message in Kafka.
>> > Whereas new messages in Temporal Table don't trigger JOIN operation.
>> > 2. I need to perform LEFT OUTER JOINS, whereas Temporal Table can only
>> > be used for INNER JOINS
>> > 3. From what I understand, JOIN in Temporal Table can only be performed
>> > using Primary key, so I won't be able to Join more than one key.
>> >
>> >
>> > Could someone please help me with this? Please let me know if any of
>> the
>> > information is not clear or need more details.
>> >
>> >   If this is not the correct email id, could you please point me to the
>> > correct one.
>> >
>> >
>> > Thanks in advance!
>>
>>


Re: Change Akka Ask Timeout for Job Submission Only

2019-12-19 Thread tison
In previous version there is an "akka.client.timeout" option but it is only
used for timeout the future in client side so I don't think it change akka
scope timeout.

Best,
tison.


Abdul Qadeer  于2019年12月20日周五 上午10:44写道:

> Hi!
>
> I am using Flink 1.8.3 and facing an issue where job submission through
> RestClusterClient times out on Akka (default value 10s). In previous Flink
> versions there was an option to set a different timeout value just for the
> submission client (ClusterClient config), but looks like it is not honored
> now as job submission from client is no more through Akka and it will use
> the same value present with Dispatcher. I wanted to know how to increase
> this timeout just for job submission without affecting other akka threads
> in TaskManager/JobManager, or any other solution for the problem.
>
> The relevant stack trace is pasted below:
>
> "cause":{"commonElementCount":8,"localizedMessage":"Could not submit job
> (JobID: 26940c17ae3130fb8be1323cce1036e4)","message":"Could not submit job
> (JobID:
> 26940c17ae3130fb8be1323cce1036e4)","name":"org.apache.flink.client.program.ProgramInvocationException","cause":{"commonElementCount":3,"localizedMessage":"Failed
> to submit JobGraph.","message":"Failed to submit
> JobGraph.","name":"org.apache.flink.runtime.client.JobSubmissionException","cause":{"commonElementCount":3,"localizedMessage":"[Internal
> server error.,  side:\nakka.pattern.AskTimeoutException: Ask timed out on
> [Actor[akka://flink/user/dispatcher#1457923918]] after [1 ms].
> Sender[null] sent message of type
> \"org.apache.flink.runtime.rpc.messages.LocalFencedMessage\".\n\tat
> akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:604)\n\tat
> akka.actor.Scheduler$$anon$4.run(Scheduler.scala:126)\n\tat
> scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:601)\n\tat
> scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109)\n\tat
> scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:599)\n\tat
> akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:329)\n\tat
> akka.actor.LightArrayRevolverScheduler$$anon$4.executeBucket$1(LightArrayRevolverScheduler.scala:280)\n\tat
> akka.actor.LightArrayRevolverScheduler$$anon$4.nextTick(LightArrayRevolverScheduler.scala:284)\n\tat
> akka.actor.LightArrayRevolverScheduler$$anon$4.run(LightArrayRevolverScheduler.scala:236)\n\tat
> java.lang.Thread.run(Thread.java:745)\n\nEnd of exception on server
> side>]","message":"[Internal server error.,  side:\nakka.pattern.AskTimeoutException: Ask timed out on
> [Actor[akka://flink/user/dispatcher#1457923918]] after [1 ms].
> Sender[null] sent message of type
> \"org.apache.flink.runtime.rpc.messages.LocalFencedMessage\".\n\tat
> akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:604)\n\tat
> akka.actor.Scheduler$$anon$4.run(Scheduler.scala:126)\n\tat
> scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:601)\n\tat
> scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109)\n\tat
> scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:599)\n\tat
> akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:329)\n\tat
> akka.actor.LightArrayRevolverScheduler$$anon$4.executeBucket$1(LightArrayRevolverScheduler.scala:280)\n\tat
> akka.actor.LightArrayRevolverScheduler$$anon$4.nextTick(LightArrayRevolverScheduler.scala:284)\n\tat
> akka.actor.LightArrayRevolverScheduler$$anon$4.run(LightArrayRevolverScheduler.scala:236)\n\tat
> java.lang.Thread.run(Thread.java:745)\n\nEnd of exception on server
> side>]","name":"org.apache.flink.runtime.rest.util.RestClientException","extendedStackTrace":[{"class":"org.apache.flink.runtime.rest.RestClient","method":"parseResponse","file":"RestClient.java","line":389,"exact":false,"location":"flink-runtime_2.11-1.8.2.jar","version":"1.8.2"},{"class":"org.apache.flink.runtime.rest.RestClient","method":"lambda$submitRequest$3","file":"RestClient.java","line":373,"exact":false,"location":"flink-runtime_2.11-1.8.2.jar","version":"1.8.2"}
>


Re: Unit testing filter function in flink

2019-12-19 Thread vino yang
Hi Vishwas,

Apache Flink provides some test harness to test your application code on
multiple levels of the testing pyramid.

You can use them to test your UDF. Please see more examples offered by the
official documentation[1].

Best,
Vino

[1]:
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/testing.html

Vishwas Siravara  于2019年12月20日周五 上午6:27写道:

> Hi guys,
> I want to test a function like :
>
> private[flink] def filterStream(dataStream: DataStream[GenericRecord]): 
> DataStream[GenericRecord] = {
>   dataStream.filter(new FilterFunction[GenericRecord] {
> override def filter(value: GenericRecord): Boolean = {
>   if (value == null || value.get(StipFields.requestMessageType) == null) {
> return false;
>   } else {
> 
> ExecutionEnv.messageTypeList.contains(value.get(StipFields.requestMessageType)
>   .toString) && 
> ExecutionEnv.pcrList.contains(value.get(StipFields.pcrList).toString) && 
> (value.get(StipFields
>   .rejectCode).asInstanceOf[Int] == 0) && 
> !(value.get(StipFields.processingCode).toString.equals("33"))
>   }
> }
>   })
> }
>
> How can I do this ?
>
> Best,
> Vishwas
>
>


Re: Flink Prometheus metric doubt

2019-12-19 Thread vino yang
Hi Jesus,

IMHO, maybe @Chesnay Schepler  can provide more
information.

Best,
Vino

Jesús Vásquez  于2019年12月19日周四 下午6:57写道:

> Hi all, i'm monitoring Flink jobs using prometheus.
> I have been trying to use the metrics flink_jobmanager_job_uptime/downtime
> in order to create an alert, that fires when one of this values emits -1
> since the doc says this is the behavior of the metric when the job gets to
> a completed state.
> The thing is that i have tested the behavior when one of my job fails and
> the mentioned metrics never emit something different than zero. Finally the
> metric disappears after the job has failed.
> Am i missing something or is this the expected behavior ?
>


Re: Change Akka Ask Timeout for Job Submission Only

2019-12-19 Thread Abdul Qadeer
The relevant config here is "akka.ask.timeout".

On Thu, Dec 19, 2019 at 6:51 PM tison  wrote:

> In previous version there is an "akka.client.timeout" option but it is
> only used for timeout the future in client side so I don't think it change
> akka scope timeout.
>
> Best,
> tison.
>
>
> Abdul Qadeer  于2019年12月20日周五 上午10:44写道:
>
>> Hi!
>>
>> I am using Flink 1.8.3 and facing an issue where job submission through
>> RestClusterClient times out on Akka (default value 10s). In previous Flink
>> versions there was an option to set a different timeout value just for the
>> submission client (ClusterClient config), but looks like it is not honored
>> now as job submission from client is no more through Akka and it will use
>> the same value present with Dispatcher. I wanted to know how to increase
>> this timeout just for job submission without affecting other akka threads
>> in TaskManager/JobManager, or any other solution for the problem.
>>
>> The relevant stack trace is pasted below:
>>
>> "cause":{"commonElementCount":8,"localizedMessage":"Could not submit job
>> (JobID: 26940c17ae3130fb8be1323cce1036e4)","message":"Could not submit job
>> (JobID:
>> 26940c17ae3130fb8be1323cce1036e4)","name":"org.apache.flink.client.program.ProgramInvocationException","cause":{"commonElementCount":3,"localizedMessage":"Failed
>> to submit JobGraph.","message":"Failed to submit
>> JobGraph.","name":"org.apache.flink.runtime.client.JobSubmissionException","cause":{"commonElementCount":3,"localizedMessage":"[Internal
>> server error., > side:\nakka.pattern.AskTimeoutException: Ask timed out on
>> [Actor[akka://flink/user/dispatcher#1457923918]] after [1 ms].
>> Sender[null] sent message of type
>> \"org.apache.flink.runtime.rpc.messages.LocalFencedMessage\".\n\tat
>> akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:604)\n\tat
>> akka.actor.Scheduler$$anon$4.run(Scheduler.scala:126)\n\tat
>> scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:601)\n\tat
>> scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109)\n\tat
>> scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:599)\n\tat
>> akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:329)\n\tat
>> akka.actor.LightArrayRevolverScheduler$$anon$4.executeBucket$1(LightArrayRevolverScheduler.scala:280)\n\tat
>> akka.actor.LightArrayRevolverScheduler$$anon$4.nextTick(LightArrayRevolverScheduler.scala:284)\n\tat
>> akka.actor.LightArrayRevolverScheduler$$anon$4.run(LightArrayRevolverScheduler.scala:236)\n\tat
>> java.lang.Thread.run(Thread.java:745)\n\nEnd of exception on server
>> side>]","message":"[Internal server error., > side:\nakka.pattern.AskTimeoutException: Ask timed out on
>> [Actor[akka://flink/user/dispatcher#1457923918]] after [1 ms].
>> Sender[null] sent message of type
>> \"org.apache.flink.runtime.rpc.messages.LocalFencedMessage\".\n\tat
>> akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:604)\n\tat
>> akka.actor.Scheduler$$anon$4.run(Scheduler.scala:126)\n\tat
>> scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:601)\n\tat
>> scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109)\n\tat
>> scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:599)\n\tat
>> akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:329)\n\tat
>> akka.actor.LightArrayRevolverScheduler$$anon$4.executeBucket$1(LightArrayRevolverScheduler.scala:280)\n\tat
>> akka.actor.LightArrayRevolverScheduler$$anon$4.nextTick(LightArrayRevolverScheduler.scala:284)\n\tat
>> akka.actor.LightArrayRevolverScheduler$$anon$4.run(LightArrayRevolverScheduler.scala:236)\n\tat
>> java.lang.Thread.run(Thread.java:745)\n\nEnd of exception on server
>> side>]","name":"org.apache.flink.runtime.rest.util.RestClientException","extendedStackTrace":[{"class":"org.apache.flink.runtime.rest.RestClient","method":"parseResponse","file":"RestClient.java","line":389,"exact":false,"location":"flink-runtime_2.11-1.8.2.jar","version":"1.8.2"},{"class":"org.apache.flink.runtime.rest.RestClient","method":"lambda$submitRequest$3","file":"RestClient.java","line":373,"exact":false,"location":"flink-runtime_2.11-1.8.2.jar","version":"1.8.2"}
>>
>


Re: Change Akka Ask Timeout for Job Submission Only

2019-12-19 Thread Yang Wang
It seems that not because the timeout of rest client. It is a server side
akka timeout exception.
Could you share the jobmanager logs?

Best,
Yang

Abdul Qadeer  于2019年12月20日周五 上午10:59写道:

> The relevant config here is "akka.ask.timeout".
>
> On Thu, Dec 19, 2019 at 6:51 PM tison  wrote:
>
>> In previous version there is an "akka.client.timeout" option but it is
>> only used for timeout the future in client side so I don't think it change
>> akka scope timeout.
>>
>> Best,
>> tison.
>>
>>
>> Abdul Qadeer  于2019年12月20日周五 上午10:44写道:
>>
>>> Hi!
>>>
>>> I am using Flink 1.8.3 and facing an issue where job submission through
>>> RestClusterClient times out on Akka (default value 10s). In previous Flink
>>> versions there was an option to set a different timeout value just for the
>>> submission client (ClusterClient config), but looks like it is not honored
>>> now as job submission from client is no more through Akka and it will use
>>> the same value present with Dispatcher. I wanted to know how to increase
>>> this timeout just for job submission without affecting other akka threads
>>> in TaskManager/JobManager, or any other solution for the problem.
>>>
>>> The relevant stack trace is pasted below:
>>>
>>> "cause":{"commonElementCount":8,"localizedMessage":"Could not submit job
>>> (JobID: 26940c17ae3130fb8be1323cce1036e4)","message":"Could not submit job
>>> (JobID:
>>> 26940c17ae3130fb8be1323cce1036e4)","name":"org.apache.flink.client.program.ProgramInvocationException","cause":{"commonElementCount":3,"localizedMessage":"Failed
>>> to submit JobGraph.","message":"Failed to submit
>>> JobGraph.","name":"org.apache.flink.runtime.client.JobSubmissionException","cause":{"commonElementCount":3,"localizedMessage":"[Internal
>>> server error., >> side:\nakka.pattern.AskTimeoutException: Ask timed out on
>>> [Actor[akka://flink/user/dispatcher#1457923918]] after [1 ms].
>>> Sender[null] sent message of type
>>> \"org.apache.flink.runtime.rpc.messages.LocalFencedMessage\".\n\tat
>>> akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:604)\n\tat
>>> akka.actor.Scheduler$$anon$4.run(Scheduler.scala:126)\n\tat
>>> scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:601)\n\tat
>>> scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109)\n\tat
>>> scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:599)\n\tat
>>> akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:329)\n\tat
>>> akka.actor.LightArrayRevolverScheduler$$anon$4.executeBucket$1(LightArrayRevolverScheduler.scala:280)\n\tat
>>> akka.actor.LightArrayRevolverScheduler$$anon$4.nextTick(LightArrayRevolverScheduler.scala:284)\n\tat
>>> akka.actor.LightArrayRevolverScheduler$$anon$4.run(LightArrayRevolverScheduler.scala:236)\n\tat
>>> java.lang.Thread.run(Thread.java:745)\n\nEnd of exception on server
>>> side>]","message":"[Internal server error., >> side:\nakka.pattern.AskTimeoutException: Ask timed out on
>>> [Actor[akka://flink/user/dispatcher#1457923918]] after [1 ms].
>>> Sender[null] sent message of type
>>> \"org.apache.flink.runtime.rpc.messages.LocalFencedMessage\".\n\tat
>>> akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:604)\n\tat
>>> akka.actor.Scheduler$$anon$4.run(Scheduler.scala:126)\n\tat
>>> scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:601)\n\tat
>>> scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109)\n\tat
>>> scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:599)\n\tat
>>> akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:329)\n\tat
>>> akka.actor.LightArrayRevolverScheduler$$anon$4.executeBucket$1(LightArrayRevolverScheduler.scala:280)\n\tat
>>> akka.actor.LightArrayRevolverScheduler$$anon$4.nextTick(LightArrayRevolverScheduler.scala:284)\n\tat
>>> akka.actor.LightArrayRevolverScheduler$$anon$4.run(LightArrayRevolverScheduler.scala:236)\n\tat
>>> java.lang.Thread.run(Thread.java:745)\n\nEnd of exception on server
>>> side>]","name":"org.apache.flink.runtime.rest.util.RestClientException","extendedStackTrace":[{"class":"org.apache.flink.runtime.rest.RestClient","method":"parseResponse","file":"RestClient.java","line":389,"exact":false,"location":"flink-runtime_2.11-1.8.2.jar","version":"1.8.2"},{"class":"org.apache.flink.runtime.rest.RestClient","method":"lambda$submitRequest$3","file":"RestClient.java","line":373,"exact":false,"location":"flink-runtime_2.11-1.8.2.jar","version":"1.8.2"}
>>>
>>


Re: Change Akka Ask Timeout for Job Submission Only

2019-12-19 Thread tison
IIRC this issue is possibly caused by resource limited or some occasional
reasons. Ever I heard that someone upgrade Java version and the issue
vanished.

For "akka.ask.timeout", it is used for all akka ask requests timeout. And I
second Yang that the timeout is irrelevant with client-server connection.

Best,
tison.


Yang Wang  于2019年12月20日周五 上午11:02写道:

> It seems that not because the timeout of rest client. It is a server side
> akka timeout exception.
> Could you share the jobmanager logs?
>
> Best,
> Yang
>
> Abdul Qadeer  于2019年12月20日周五 上午10:59写道:
>
>> The relevant config here is "akka.ask.timeout".
>>
>> On Thu, Dec 19, 2019 at 6:51 PM tison  wrote:
>>
>>> In previous version there is an "akka.client.timeout" option but it is
>>> only used for timeout the future in client side so I don't think it change
>>> akka scope timeout.
>>>
>>> Best,
>>> tison.
>>>
>>>
>>> Abdul Qadeer  于2019年12月20日周五 上午10:44写道:
>>>
 Hi!

 I am using Flink 1.8.3 and facing an issue where job submission through
 RestClusterClient times out on Akka (default value 10s). In previous Flink
 versions there was an option to set a different timeout value just for the
 submission client (ClusterClient config), but looks like it is not honored
 now as job submission from client is no more through Akka and it will use
 the same value present with Dispatcher. I wanted to know how to increase
 this timeout just for job submission without affecting other akka threads
 in TaskManager/JobManager, or any other solution for the problem.

 The relevant stack trace is pasted below:

 "cause":{"commonElementCount":8,"localizedMessage":"Could not submit
 job (JobID: 26940c17ae3130fb8be1323cce1036e4)","message":"Could not submit
 job (JobID:
 26940c17ae3130fb8be1323cce1036e4)","name":"org.apache.flink.client.program.ProgramInvocationException","cause":{"commonElementCount":3,"localizedMessage":"Failed
 to submit JobGraph.","message":"Failed to submit
 JobGraph.","name":"org.apache.flink.runtime.client.JobSubmissionException","cause":{"commonElementCount":3,"localizedMessage":"[Internal
 server error., >>> side:\nakka.pattern.AskTimeoutException: Ask timed out on
 [Actor[akka://flink/user/dispatcher#1457923918]] after [1 ms].
 Sender[null] sent message of type
 \"org.apache.flink.runtime.rpc.messages.LocalFencedMessage\".\n\tat
 akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:604)\n\tat
 akka.actor.Scheduler$$anon$4.run(Scheduler.scala:126)\n\tat
 scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:601)\n\tat
 scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109)\n\tat
 scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:599)\n\tat
 akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:329)\n\tat
 akka.actor.LightArrayRevolverScheduler$$anon$4.executeBucket$1(LightArrayRevolverScheduler.scala:280)\n\tat
 akka.actor.LightArrayRevolverScheduler$$anon$4.nextTick(LightArrayRevolverScheduler.scala:284)\n\tat
 akka.actor.LightArrayRevolverScheduler$$anon$4.run(LightArrayRevolverScheduler.scala:236)\n\tat
 java.lang.Thread.run(Thread.java:745)\n\nEnd of exception on server
 side>]","message":"[Internal server error., >>> side:\nakka.pattern.AskTimeoutException: Ask timed out on
 [Actor[akka://flink/user/dispatcher#1457923918]] after [1 ms].
 Sender[null] sent message of type
 \"org.apache.flink.runtime.rpc.messages.LocalFencedMessage\".\n\tat
 akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:604)\n\tat
 akka.actor.Scheduler$$anon$4.run(Scheduler.scala:126)\n\tat
 scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:601)\n\tat
 scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109)\n\tat
 scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:599)\n\tat
 akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:329)\n\tat
 akka.actor.LightArrayRevolverScheduler$$anon$4.executeBucket$1(LightArrayRevolverScheduler.scala:280)\n\tat
 akka.actor.LightArrayRevolverScheduler$$anon$4.nextTick(LightArrayRevolverScheduler.scala:284)\n\tat
 akka.actor.LightArrayRevolverScheduler$$anon$4.run(LightArrayRevolverScheduler.scala:236)\n\tat
 java.lang.Thread.run(Thread.java:745)\n\nEnd of exception on server
 side>]","name":"org.apache.flink.runtime.rest.util.RestClientException","extendedStackTrace":[{"class":"org.apache.flink.runtime.rest.RestClient","method":"parseResponse","file":"RestClient.java","line":389,"exact":false,"location":"flink-runtime_2.11-1.8.2.jar","version":"1.8.2"},{"class":"org.apache.flink.runtime.rest.RestClient","method":"lambda$submitRequest$3","file":"RestClient.java","line":373,"exact":false,"locat

Re: Change Akka Ask Timeout for Job Submission Only

2019-12-19 Thread tison
Forward to user list.

Best,
tison.


Abdul Qadeer  于2019年12月20日周五 下午12:57写道:

> Around submission time, logs from jobmanager:
>
> {"timeMillis":1576764854245,"thread":"flink-akka.actor.default-dispatcher-1016","level":"INFO","loggerName":"org.apache.flink.runtime.dispatcher.StandaloneDispatcher","message":"Received
> JobGraph submission 714829e8f6c8cd0daaed335c1b8c588a
> (sample).","endOfBatch":false,"loggerFqcn":"org.apache.logging.slf4j.Log4jLogger","threadId":1126,"threadPriority":5}^M
> {"timeMillis":1576764854247,"thread":"flink-akka.actor.default-dispatcher-1016","level":"INFO","loggerName":"org.apache.flink.runtime.dispatcher.StandaloneDispatcher","message":"Submitting
> job 714829e8f6c8cd0daaed335c1b8c588a
> (sample).","endOfBatch":false,"loggerFqcn":"org.apache.logging.slf4j.Log4jLogger","threadId":1126,"threadPriority":5}^M
> {"timeMillis":1576764856119,"thread":"flink-akka.actor.default-dispatcher-1036","level":"INFO","loggerName":"akka.actor.RepointableActorRef","message":"Message
> [org.apache.flink.runtime.rpc.messages.LocalFencedMessage] from
> Actor[akka://flink/deadLetters] to
> Actor[akka://flink/user/jobmanager_4#-2122695705] was not delivered. [87]
> dead letters encountered. This logging can be turned off or adjusted with
> configuration settings 'akka.log-dead-letters' and
> 'akka.log-dead-letters-during-shutdown'.","endOfBatch":false,"loggerFqcn":"org.apache.logging.slf4j.Log4jLogger","threadId":1150,"threadPriority":5}^M
> {"timeMillis":1576764877732,"thread":"flink-akka.actor.default-dispatcher-1039","level":"INFO","loggerName":"akka.actor.RepointableActorRef","message":"Message
> [org.apache.flink.runtime.rpc.messages.LocalFencedMessage] from
> Actor[akka://flink/deadLetters] to
> Actor[akka://flink/user/jobmanager_4#-2122695705] was not delivered. [88]
> dead letters encountered. This logging can be turned off or adjusted with
> configuration settings 'akka.log-dead-letters' and
> 'akka.log-dead-letters-during-shutdown'.","endOfBatch":false,"loggerFqcn":"org.apache.logging.slf4j.Log4jLogger","threadId":1155,"threadPriority":5}^M
> {"timeMillis":1576764877732,"thread":"flink-scheduler-1","level":"ERROR","loggerName":"org.apache.flink.runtime.rest.handler.job.JobSubmitHandler","message":"Unhandled
> exception.","thrown":{"commonElementCount":0,"localizedMessage":"Ask timed
> out on [Actor[akka://flink/user/dispatcher#1899316777]] after [1 ms].
> Sender[null] sent message of type
> \"org.apache.flink.runtime.rpc.messages.LocalFencedMessage\".","message":"Ask
> timed out on [Actor[akka://flink/user/dispatcher#1899316777]] after [1
> ms]. Sender[null] sent message of type
> \"org.apache.flink.runtime.rpc.messages.LocalFencedMessage\".","name":"akka.pattern.AskTimeoutException","extendedStackTrace":[{"class":"akka.pattern.PromiseActorRef$$anonfun$1","method":"apply$mcV$sp","file":"AskSupport.scala","line":604,"exact":true,"location":"akka-actor_2.11-2.4.20.jar","version":"?"},{"class":"akka.actor.Scheduler$$anon$4","method":"run","file":"Scheduler.scala","line":126,"exact":true,"location":"akka-actor_2.11-2.4.20.jar","version":"?"},{"class":"scala.concurrent.Future$InternalCallbackExecutor$","method":"unbatchedExecute","file":"Future.scala","line":601,"exact":true,"location":"scala-library-2.11.12.jar","version":"?"},{"class":"scala.concurrent.BatchingExecutor$class","method":"execute","file":"BatchingExecutor.scala","line":109,"exact":true,"location":"scala-library-2.11.12.jar","version":"?"},{"class":"scala.concurrent.Future$InternalCallbackExecutor$","method":"execute","file":"Future.scala","line":599,"exact":true,"location":"scala-library-2.11.12.jar","version":"?"},{"class":"akka.actor.LightArrayRevolverScheduler$TaskHolder","method":"executeTask","file":"LightArrayRevolverScheduler.scala","line":329,"exact":true,"location":"akka-actor_2.11-2.4.20.jar","version":"?"},{"class":"akka.actor.LightArrayRevolverScheduler$$anon$4","method":"executeBucket$1","file":"LightArrayRevolverScheduler.scala","line":280,"exact":true,"location":"akka-actor_2.11-2.4.20.jar","version":"?"},{"class":"akka.actor.LightArrayRevolverScheduler$$anon$4","method":"nextTick","file":"LightArrayRevolverScheduler.scala","line":284,"exact":true,"location":"akka-actor_2.11-2.4.20.jar","version":"?"},{"class":"akka.actor.LightArrayRevolverScheduler$$anon$4","method":"run","file":"LightArrayRevolverScheduler.scala","line":236,"exact":true,"location":"akka-actor_2.11-2.4.20.jar","version":"?"},{"class":"java.lang.Thread","method":"run","file":"Thread.java","line":745,"exact":true,"location":"?","version":"1.8.0_66"}]},"endOfBatch":false,"loggerFqcn":"org.apache.logging.slf4j.Log4jLogger","threadId":41,"threadPriority":5}^M
> {"timeMillis":1576764877809,"thread":"flink-akka.actor.default-dispatcher-1016","level":"INFO","loggerName":"org.apache.flink.runtime.rpc.akka.AkkaRpcService","message":"Starting
> RPC endpoint for org.apache.flink.runtime.jobmaster.JobMaster at
> akka://flink/user/jobmanager_5
> .","end

java.lang.NoClassDefFoundError due to Native library issue?

2019-12-19 Thread Hegde, Mahendra
Hello Team,

I am trying to use timezone finding service 
(https://github.com/dustin-johnson/timezonemap) in my Flink java job. It worked 
fine in local machine and it worked fine initially in Flink server, but after 
2-3 restart of the job it started giving NoClassDefFoundError error-

java.lang.NoClassDefFoundError: Could not initialize class 
com.github.luben.zstd.ZstdInputStream
at 
org.apache.commons.compress.compressors.zstandard.ZstdCompressorInputStream.(ZstdCompressorInputStream.java:43)
at 
us.dustinj.timezonemap.data.DataLocator.getDataInputStream(DataLocator.java:22)
at 
us.dustinj.timezonemap.TimeZoneMap.forRegion(TimeZoneMap.java:92)
at 
us.dustinj.timezonemap.TimeZoneMap.forEverywhere(TimeZoneMap.java:60)
at dtap.service.TimezoneService.(TimezoneService.java:17)
at 
functions.processors.KernelIntgrationProcessor.processElement(KernelIntgrationProcessor.java:47)
at 
functions.processors.KernelIntgrationProcessor.processElement(KernelIntgrationProcessor.java:23)
at 
org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66)
at 
org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)
at 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
at java.lang.Thread.run(Thread.java:748)



When I looked at the code of ZstdCompressorInputStream class, it is loading 
native libraries. And I found from below page that native library unloading 
issue could be the reason for this.

https://www.ibm.com/support/knowledgecenter/SS7K4U_8.5.5/com.ibm.websphere.zseries.doc/ae/rtrb_classload_viewer.html

Does anyone has faced similar issue or has idea to solve this problem?



Thanks,

Mahendra



Re: java.lang.NoClassDefFoundError due to Native library issue?

2019-12-19 Thread Ruidong Li
I've come across a similar issue before, the reason is that for a dynamic
link library(so/dll), the there can only be one classloader to load it.
When restart/failover happens in flink, the JVM will not exit but only
create a new classloader which leads to multiple loading to the same
so/dll, here same so/dll means the full path of a so/dll , unless the
previous classloader has been garbage collected.

So here comes the solution
1. load a new so/dll, you can rename the so file, change the path of so,
this is how flink rocksdb stateback works.
2. try using a fixed classloader


Hegde, Mahendra  于2019年12月20日周五 下午2:19写道:

> Hello Team,
>
>
>
> I am trying to use timezone finding service (
> https://github.com/dustin-johnson/timezonemap) in my Flink java job. It
> worked fine in local machine and it worked fine initially in Flink server,
> but after 2-3 restart of the job it started giving NoClassDefFoundError
> error-
>
>
>
> java.lang.NoClassDefFoundError: Could not initialize class
> com.github.luben.zstd.ZstdInputStream
>
> at
> org.apache.commons.compress.compressors.zstandard.ZstdCompressorInputStream.(ZstdCompressorInputStream.java:43)
>
> at
> us.dustinj.timezonemap.data.DataLocator.getDataInputStream(DataLocator.java:22)
>
> at
> us.dustinj.timezonemap.TimeZoneMap.forRegion(TimeZoneMap.java:92)
>
> at
> us.dustinj.timezonemap.TimeZoneMap.forEverywhere(TimeZoneMap.java:60)
>
> at
> dtap.service.TimezoneService.(TimezoneService.java:17)
>
> at
> functions.processors.KernelIntgrationProcessor.processElement(KernelIntgrationProcessor.java:47)
>
> at
> functions.processors.KernelIntgrationProcessor.processElement(KernelIntgrationProcessor.java:23)
>
> at
> org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66)
>
> at
> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)
>
> at
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
>
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
>
> at
> org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
>
> at java.lang.Thread.run(Thread.java:748)
>
>
>
>
>
> When I looked at the code of ZstdCompressorInputStream class, it is
> loading native libraries. And I found from below page that native library
> unloading issue could be the reason for this.
>
>
> https://www.ibm.com/support/knowledgecenter/SS7K4U_8.5.5/com.ibm.websphere.zseries.doc/ae/rtrb_classload_viewer.html
>
> Does anyone has faced similar issue or has idea to solve this problem?
>
>
>
> Thanks,
>
> Mahendra
>
>
>


Re: POJO ERROR

2019-12-19 Thread Alexandru Vasiu
Hi,

We fixed it by converting the case class to a class.

Thank you,
Alex

On Thu, Dec 19, 2019 at 5:43 PM Timo Walther  wrote:

> Sorry, you are right. Maybe you can also share the full stack trace
> because I don't know where this guava library should be used.
>
> Regards,
> Timo
>
>
> On 19.12.19 14:50, Alexandru Vasiu wrote:
> > Nope, because scalaBuildVersion is the scala version including minor
> > version so in this case: 2.12.10 and we used it just where we need.
> > We used scalaVersion to specify for each library what scala is used, so
> > used flink will be flink-streaming-scala_2.12
> >
> > Alex
> >
> > On Thu, Dec 19, 2019 at 3:40 PM Timo Walther  > > wrote:
> >
> > I see a mismatch between scalaBuildVersion and scalaVersion could
> this
> > be the issue?
> >
> > Regards,
> > Timo
> >
> >
> > On 19.12.19 14:33, Alexandru Vasiu wrote:
> >  > This is a part of my Gradle config:
> >  >
> >  > ext {
> >  >  scalaVersion = '2.12'
> >  >  flinkVersion = '1.9.1'
> >  >  scalaBuildVersion = "${scalaVersion}.10"
> >  >  scalaMockVersion = '4.4.0'
> >  >  circeGenericVersion = '0.12.3'
> >  >  circeExtrasVersion = '0.12.2'
> >  >  pardiseVersion = '2.1.1'
> >  >  slf4jVersion = '1.7.7'
> >  >  log4jVersion = '1.2.17'
> >  >  sourceDir = 'src/main/scala'
> >  >  testDir = 'src/test/scala'
> >  > }
> >  > repositories {
> >  >  mavenCentral()
> >  >  //maven { url
> >  > "https://repository.apache.org/content/repositories/snapshots/"; }
> >  > }
> >  > configurations {
> >  >  scalaCompilerPlugin
> >  > }
> >  > dependencies {
> >  >  implementation
> > "org.scala-lang:scala-library:${scalaBuildVersion}"
> >  >  //
> > --
> >  >  // Compile-time dependencies that should NOT be part of the
> >  >  // shadow jar and are provided in the lib folder of Flink
> >  >  //
> > --
> >  >  //compile "org.apache.flink:flink-java:${flinkVersion}"
> >  >  implementation
> >  >
> >
>  "org.apache.flink:flink-streaming-scala_${scalaVersion}:${flinkVersion}"
> >  >  implementation
> >  >
> >
>  "org.apache.flink:flink-connector-kafka_${scalaVersion}:${flinkVersion}"
> >  >  //
> > --
> >  >  // Dependencies that should be part of the shadow jar, e.g.
> >  >  // connectors. These must be in the flinkShadowJar
> > configuration!
> >  >  //
> > --
> >  >  //flinkShadowJar
> >  >
> >
>  
> "org.apache.flink:flink-connector-kafka-0.11_${scalaBinaryVersion}:${flinkVersion}"
> >  >  // https://mvnrepository.com/artifact/io.circe/
> >  >  implementation
> >  > "io.circe:circe-generic_${scalaVersion}:${circeGenericVersion}"
> >  >  implementation
> >  >
> "io.circe:circe-generic-extras_${scalaVersion}:${circeExtrasVersion}"
> >  >  implementation
> >  > "io.circe:circe-parser_${scalaVersion}:${circeGenericVersion}"
> >  >  //
> https://mvnrepository.com/artifact/org.scalamacros/paradise
> >  >  scalaCompilerPlugin
> >  > "org.scalamacros:paradise_${scalaBuildVersion}:${pardiseVersion}"
> >  >  implementation "log4j:log4j:${log4jVersion}"
> >  >  implementation "org.slf4j:slf4j-log4j12:${slf4jVersion}"
> >  >  // Add test dependencies here.
> >  >  //testImplementation "junit:junit:4.12"
> >  >  testImplementation
> > "org.scalatest:scalatest_${scalaVersion}:3.1.0"
> >  >  //
> https://mvnrepository.com/artifact/org.scalamock/scalamock
> >  >  testImplementation
> >  > "org.scalamock:scalamock_${scalaVersion}:${scalaMockVersion}"
> >  > }
> >  >
> >  > So all are with the same scala version. I cannot share the code,
> > but the
> >  > main app looks like:
> >  >
> >  > val env = StreamExecutionEnvironment.getExecutionEnvironment
> >  > val stream  = env
> >  >  .addSource(KAFKA_STREAM) // this will get us a stream
> > with our
> >  > object model which is like this: case class A(a:Map[String,
> >  > other_case_class_obj], b: List[other_case_class_obj], c: String)
> >  > .flatMap(CustomFlatMap())
> >  > .print
> >  >
> >  > Thank you,
> >  > Alex
> >  >
> >  > On Thu, Dec 19, 2019 at 3:14 PM Timo Walther  > 
> >  > >> wrote:
> >  >
> >  > That's sounds like a classloading or most likely dependency
> > issue.
> >  >
> >  > Are all dependencie