Re: RocksDB Read IOPs

2018-09-25 Thread Yun Tang
Hi Ning

>From your description, I think you actually concern more about the overall 
>performance instead of the high disk IOPs. Maybe you should first ensure 
>whether the job performance degradation is related to RocksDB's performance.

Then I would share some experience about tuning RocksDB performance. Since you 
did not cache index and filter in block cache, it's no worry about the 
competition between data blocks and index&filter blocks[1]. And to improve the 
read performance, you should increase your block cache size to 256MB or even 
512MB. What's more, writer buffer in rocksDB also acts as a role for reading, 
from our experience, we use 4 max write buffers and 32MB each, e.g.  
setMaxWriteBufferNumber(4) and setWriteBufferSize(32*1024*1024)

Best
Yun Tang

[1] 
https://github.com/facebook/rocksdb/wiki/Block-Cache#caching-index-and-filter-blocks

[https://avatars0.githubusercontent.com/u/69631?s=400&v=4]

Block Cache · facebook/rocksdb Wiki · 
GitHub
A library that provides an embeddable, persistent key-value store for fast 
storage. - facebook/rocksdb
github.com



From: Ning Shi 
Sent: Wednesday, September 26, 2018 11:25
To: user
Subject: RocksDB Read IOPs

Hi,

I'm benchmarking a job with large state in various window sizes
(hourly, daily). I noticed that it would consistently slow down after
30 minutes into the benchmark due to high disk read IOPs. The first 30
minutes were fine, with close to 0 disk IOPs. Then after 30 minutes,
read IOPs would gradually climb to as high as 10k/s. At this point,
the job was bottlenecked on disk IOPs because I'm using 2TB EBS-backed
volume.

Another thread on the mailing list mentioned potentially running into
burst IOPs credit could be the cause of slowdown. It's not that in
this case because I'm using 2TB EBS.

Someone also mentioned RocksDB compaction could potentially increase
read IOPs a lot.

I'm currently running the job with these RocksDB settings.

@Override
public DBOptions createDBOptions(DBOptions currentOptions) {
return currentOptions
.setIncreaseParallelism(4)
.setUseFsync(false)
.setMaxOpenFiles(-1);
}

@Override
public ColumnFamilyOptions createColumnOptions(ColumnFamilyOptions
currentOptions) {
final long blockCacheSize = 64 * 1024 * 1024;
return currentOptions
.setTableFormatConfig(
new BlockBasedTableConfig()
.setBlockCacheSize(blockCacheSize)
);
}

Any insights into how I can further diagnose this? Is there anyway to
see compaction stats or any settings I should try?

Thanks,

Ning


Re: "405 HTTP method POST is not supported by this URL" is returned when POST a REST url on Flink on yarn

2018-09-25 Thread Gary Yao
Hi Henry,

The URL below looks like the one from the YARN proxy (note that "proxy"
appears in the URL):


http://storage4.test.lan:8089/proxy/application_1532081306755_0124/jobs/269608367e0f548f30d98aa4efa2211e/savepoints

You can use

yarn application -status 

to find the host and port of the application master (AM host & RPC Port).

Best,
Gary

On Wed, Sep 26, 2018 at 3:23 AM 徐涛  wrote:

> Hi Till,
> Actually I do send to request to the application master:
> "
> http://storage4.test.lan:8089/proxy/application_1532081306755_0124/jobs/269608367e0f548f30d98aa4efa2211e/savepoints
> ”
> with post body
> { "target-directory" : "hdfs://flinkDsl/xxx", "cancel-job" : "true” }
> If I use the following GET url, everything is OK
> "
> http://storage4.test.lan:8089/proxy/application_1532081306755_0124/jobs/269608367e0f548f30d98aa4efa2211e/checkpoints
> "
>
> Best
> Henry
>
>
> 在 2018年9月26日,上午5:32,Till Rohrmann  写道:
>
> Hi Henry,
>
> I think when running Flink on Yarn, then you must not go through the Yarn
> proxy. Instead you should directly send the post request to the node on
> which the application master runs. When starting a Flink Yarn session via
> yarn-session.sh, then the web interface URL is printed to stdout, for
> example.
>
> Cheers,
> Till
>
> On Tue, Sep 25, 2018 at 9:43 AM 徐涛  wrote:
>
>> Hi All,
>> I am trying to POST a RESTful url and want to generate a savepoint, the
>> Flink version is 1.6.0.
>> When I executed the POST in local, everything is OK, but when I POST the
>> url on a Flink on YARN application. The following error is returned:
>> “405 HTTP method POST is not supported by this URL”, I guess it is caused
>> by YARN limitation. (
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/REST-API-quot-broken-quot-on-YARN-because-POST-is-not-allowed-via-YARN-proxy-td19329.html
>> )
>> But does it have a workaround now?
>>
>> Best
>> Henry
>>
>
>


Re: Could not find a suitable table factory for 'org.apache.flink.table.factories.DeserializationSchemaFactory' in the classpath.

2018-09-25 Thread clay4444
hi Till:

I have solve the problem,
this reason is the flink-json which is add to pom didn't work

must copy the flink-json-xxx.jar to flink path ./lib/ 
...



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Could not find a suitable table factory for 'org.apache.flink.table.factories.DeserializationSchemaFactory' in the classpath.

2018-09-25 Thread clay4444
hi Till:

thank you for your reply

there are some comments:

I summit my task to yarn with following command:
./bin/flink  run  -c   org.clay.test.TestX   flinkTools-1.0.jar

my pon look like this:


1.6.0
provided





scala-tools.org
Scala-Tools Maven2 Repository
http://scala-tools.org/repo-releases





scala-tools.org
Scala-Tools Maven2 Repository
http://scala-tools.org/repo-releases









com.alibaba
fastjson
1.2.49






net.minidev
json-smart
2.3





org.apache.parquet
parquet-avro
1.8.1





org.scala-lang
scala-library
${scala.version}





org.apache.hbase
hbase-client
1.2.0





joda-time
joda-time
${joda-time.version}




org.json4s
json4s-jackson_2.11
3.2.11


jackson-databind
com.fasterxml.jackson.core





org.json4s
json4s-native_2.11
3.2.11



org.json4s

json4s-core_2.11
3.2.11



com.fasterxml.jackson.module
jackson-module-scala_2.11
2.6.5



com.fasterxml.jackson.core
jackson-databind
2.6.5




ch.qos.logback
logback-classic
1.1.3




org.apache.flink
flink-table_2.11
${flink.version}
${isLocal}



org.apache.flink
flink-connector-filesystem_2.11
${flink.version}



org.apache.flink
flink-connector-kafka-0.11_2.11
${flink.version}




org.apache.flink
flink-streaming-scala_2.11
${flink.version}



org.apache.flink
flink-core
${flink.version}
${isLocal}



org.apache.flink
flink-runtime_2.11
${flink.version}
${isLocal}



org.apache.flink
flink-json
${flink.version}




org.apache.flink
flink-hbase_2.11
${flink.version}










org.apache.flink
flink-connector-elasticsearch6_2.11
${flink.version}



org.apache.httpcomponents
httpcore
4.4.10


org.apache.httpcomponents
httpclient
4.5




org.apache.hadoop
hadoop-hdfs
2.6.0
${isLocal}


xml-apis
xml-apis





org.apache.hadoop
hadoop-common
2.6.0
${isLocal}






src/main/scala
src/test/scala


org.scala-tools
maven-scala-plugin



compile
testCompile




${scala.version}

-target:jvm-1.5




org.apache.maven.plugins
maven-eclipse-plugin

true

   
ch.epfl.lamp.sdt.core.scalabuilder


   
ch.epfl.lamp.sdt.core.scalanature


   
org.eclipse.jdt.launching.JRE_CONTAINER
   
ch.epfl.lamp.sdt.launching.SCALA_CONTAINER












org.apache.maven.plugins
maven-compiler-plugin

1.7
1.7





org.apache.maven.plugi

RocksDB Read IOPs

2018-09-25 Thread Ning Shi
Hi,

I'm benchmarking a job with large state in various window sizes
(hourly, daily). I noticed that it would consistently slow down after
30 minutes into the benchmark due to high disk read IOPs. The first 30
minutes were fine, with close to 0 disk IOPs. Then after 30 minutes,
read IOPs would gradually climb to as high as 10k/s. At this point,
the job was bottlenecked on disk IOPs because I'm using 2TB EBS-backed
volume.

Another thread on the mailing list mentioned potentially running into
burst IOPs credit could be the cause of slowdown. It's not that in
this case because I'm using 2TB EBS.

Someone also mentioned RocksDB compaction could potentially increase
read IOPs a lot.

I'm currently running the job with these RocksDB settings.

@Override
public DBOptions createDBOptions(DBOptions currentOptions) {
return currentOptions
.setIncreaseParallelism(4)
.setUseFsync(false)
.setMaxOpenFiles(-1);
}

@Override
public ColumnFamilyOptions createColumnOptions(ColumnFamilyOptions
currentOptions) {
final long blockCacheSize = 64 * 1024 * 1024;
return currentOptions
.setTableFormatConfig(
new BlockBasedTableConfig()
.setBlockCacheSize(blockCacheSize)
);
}

Any insights into how I can further diagnose this? Is there anyway to
see compaction stats or any settings I should try?

Thanks,

Ning


Intermittent issue with GCS storage

2018-09-25 Thread Heath Albritton
Howdy folks,

I'm attempting to get Flink running in a Kubernetes cluster with the
ultimate goal of using GCS for checkpoints and savepoints.  I've used
the helm chart to deploy and followed this guide, modified for 1.6.0:

https://data-artisans.com/blog/getting-started-with-da-platform-on-google-kubernetes-engine

I've built a container putting these:
flink-shaded-hadoop2-uber-1.6.0.jar
gcs-connector-hadoop2-latest.jar
in /opt/flink/lib

I've been running WordCount.jar to test, using the input and output
flags pointing at a GCS bucket.  I've verified that my two jars show
up in the classpath in the logs, but when I run the job it throws the
following errors:

flink-flink-jobmanager-9766f9b4c-kfkk5: Caused by:
org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Could
not find a file system implementation for scheme 'gs'. The scheme is
not directly supported by Flink and no Hadoop file system to support
this scheme could be loaded.
flink-flink-jobmanager-9766f9b4c-kfkk5: at
org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:403)
flink-flink-jobmanager-9766f9b4c-kfkk5: at
org.apache.flink.core.fs.FileSystem.get(FileSystem.java:318)
flink-flink-jobmanager-9766f9b4c-kfkk5: at
org.apache.flink.core.fs.Path.getFileSystem(Path.java:298)
flink-flink-jobmanager-9766f9b4c-kfkk5: at
org.apache.flink.api.common.io.FileOutputFormat.initializeGlobal(FileOutputFormat.java:275)
flink-flink-jobmanager-9766f9b4c-kfkk5: at
org.apache.flink.runtime.jobgraph.OutputFormatVertex.initializeOnMaster(OutputFormatVertex.java:89)
flink-flink-jobmanager-9766f9b4c-kfkk5: at
org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:216)

I've been wrestling with this a fair bit.  Eventually I built a
container with the core-site.xml and the GCS key in the
/opt/flink/etc-hadoop directory and then set HADOOP_CONF_DIR to point
there.

I've discovered that I can run the container in standalone mode using
the start-cluster.sh script, it works just fine.  I can replicate this
in kubernetes and locally using docker as well as locally.

If I start the job manager and the task manager individually using
their respective scripts, I get the aforementioned error.  Oddly, I
get issues when running locally as well, if I use the start-cluster.sh
script, my wordcount test works just fine.  If I start the job manager
and task manager processes using their scripts, I can read the file
from GCS, but I get a 403 when trying to write the output.

I've no idea how to proceed with troubleshooting this further as I'm a
newbie to flink.  Some direction would be helpful.


Cheers,

Heath Albritton


Re: How to get the location of keytab when using flink on yarn

2018-09-25 Thread 杨光
Hi Aljoscha,
Sorry for my late response . According to my experience , if the
flink-conf.yaml has set the "security.kerberos.login.keytab" and
"security.kerberos.login.contexts" with a kerberos file then yarn will
ship the keytab file to the TaskManager .
Also i can find the log like:
 " INFO  org.apache.flink.configuration.GlobalConfiguration-
Loading configuration property: security.kerberos.login.keytab,
/data1/yarn/nm/usercache/hadoop/appcache/application_1537515506704_0007/container_e28_1537515506704_0007_01_01/krb5.keytab"
in the TaskManager log.
My problem is that in the user code like map or sink function how can i get
the security.kerberos.login.keytab value  for login .

THANKS

Aljoscha Krettek  于2018年9月20日周四 下午9:44写道:

> Hi,
>
> if the YARN cluster does not have Kerberos enabled then Flink will not
> ship the keytab file to the worker nodes. This means that you have to make
> sure it is available at some location where your application can use it
> yourself. But this might have security risks.
>
> I'm afraid I don't know a better option now, maybe Eron knows one.
>
> On 20. Sep 2018, at 19:51, Stefan Richter 
> wrote:
>
> Hi,
>
> maybe Aljoscha or Eron (both in CC) can help you with this problem, I
> think they might know best about the Kerberos security.
>
> Best,
> Stefan
>
> Am 20.09.2018 um 11:20 schrieb 杨光 :
>
> Hi,
> i am using  the " per-job YARN session " mode deploy flink job on yarn and
> my flink
> version is 1.4.1.
>
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.6/ops/security-kerberos.html
>
> My use case is the yarn cluster where the flink job running is not enabled
> the kerberos mode in core-site.xml ,but i am trying to connecting an HBase
> cluster which is enabled kerberos. So i have to use the
> loginUserFromKeytab() method to init kerberos infomation before  init the
> HBase connection.
>
>  UserGroupInformation.loginUserFromKeytab(user, keytabLocation);
>
> So how can i get the keytab location  in my user code  ,  or is there any
> better ideas to solve the HBase kerberos problem on a yarn not using
> kerberos mode.
>
>  THANKS
>
>
>
>


Re: Flink - Process datastream in a bounded context (like Dataset) - Unifying stream & batch

2018-09-25 Thread Hequn Cheng
Hi bastien,

Could you give more details about your scenario? Do you want to load
another file from the same kafka after current file has been processed?
I am curious about why you want to join data in a bounded way when the
input data is a stream. The stream-stream join outputs same results as
batch join. So you don't have to join data on condition that all data
is received.
If you do want process data in this way, I think you can use udtf

to achieve this behavior. In the udtf, data will be loaded from kafka and
you can perform processing(join) once received all of the elements.

Best, Hequn

On Tue, Sep 25, 2018 at 10:07 PM bastien dine 
wrote:

> Hi Hequn,
>
> Thanks for your response
> Yea I know about the table API, but I am searching a way to have a bounded
> context with a stream, somehow create a dataset from a buffer store in a
> window of datastream
>
> Regards, Bastien
>
> Le mar. 25 sept. 2018 à 14:50, Hequn Cheng  a
> écrit :
>
>> Hi bastien,
>>
>> Flink features two relational APIs, the Table API and SQL. Both APIs are
>> unified APIs for batch and stream processing, i.e., queries are executed
>> with the same semantics on unbounded, real-time streams or bounded[1].
>> There are also documents about Join[2].
>>
>> Best, Hequn
>> [1] https://flink.apache.org/flink-applications.html#layered-apis
>> [2]
>> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql.html#joins
>>
>> On Tue, Sep 25, 2018 at 4:14 PM bastien dine 
>> wrote:
>>
>>> Hello everyone,
>>>
>>> I need to join some files to perform some processing.. The dataset API
>>> is a perfect way to achieve this, I am able to do it when I read file in
>>> batch (csv)
>>>
>>> However in the prod environment, I will receive thoses files in kafka
>>> messages (one message = one line of a file)
>>> So I am considering using a global window + a custom trigger on a end of
>>> file message and a process window function.
>>> But I can not go too far with that as process is only one function and
>>> chaining functions will be a pain. I don't think that emitting a datastream
>>> & windows / trigger on EOF before every process function is a good idea
>>>
>>> However I would like to work in a bounded way once I received all of my
>>> elements (after the trigger on global window), like the dataset API, as I
>>> will join on my whole dataset..
>>>
>>> I thought maybe it would be a good idea to go for table API and group
>>> window ? but you can not have custom trigger and a global group window on a
>>> table ?(like the global window on datastream ?)
>>> Best alternative would be to create a dataset as a result of my process
>>> window function.. but I don't think this is possible, is it ?
>>>
>>> Best Regards,
>>> Bastien
>>>
>>


Re: "405 HTTP method POST is not supported by this URL" is returned when POST a REST url on Flink on yarn

2018-09-25 Thread 徐涛
Hi Till,
Actually I do send to request to the application master:

"http://storage4.test.lan:8089/proxy/application_1532081306755_0124/jobs/269608367e0f548f30d98aa4efa2211e/savepoints
 
”
with post body
{ "target-directory" : "hdfs://flinkDsl/xxx", "cancel-job" : "true” }

If I use the following GET url, everything is OK

"http://storage4.test.lan:8089/proxy/application_1532081306755_0124/jobs/269608367e0f548f30d98aa4efa2211e/checkpoints
 
"


Best
Henry


> 在 2018年9月26日,上午5:32,Till Rohrmann  写道:
> 
> Hi Henry,
> 
> I think when running Flink on Yarn, then you must not go through the Yarn 
> proxy. Instead you should directly send the post request to the node on which 
> the application master runs. When starting a Flink Yarn session via 
> yarn-session.sh, then the web interface URL is printed to stdout, for example.
> 
> Cheers,
> Till
> 
> On Tue, Sep 25, 2018 at 9:43 AM 徐涛  > wrote:
> Hi All,
>   I am trying to POST a RESTful url and want to generate a savepoint, the 
> Flink version is 1.6.0.
>   When I executed the POST in local, everything is OK, but when I POST 
> the url on a Flink on YARN application. The following error is returned:
>   “405 HTTP method POST is not supported by this URL”, I guess it is 
> caused by YARN limitation. 
> (http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/REST-API-quot-broken-quot-on-YARN-because-POST-is-not-allowed-via-YARN-proxy-td19329.html
>  
> )
>   But does it have a workaround now?
> 
> Best
> Henry



Re: Scheduling sources

2018-09-25 Thread Averell
Thank you Till.

My use case is like this: I  have two streams, one is raw data (1), the
other is enrichment data (2), which in turn consists of two component:
initial enrichment data (2a) which comes from an RDBMS table, and
incremental data (2b) which comes from a Kafka stream. To ensure that (1)
gets enriched properly, I want to have (2a) loaded properly into memory
before starting to process (1). 

Is there any walkaround solution for me in this case?

Thanks and best regards,
Averell



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Question about Window Tigger

2018-09-25 Thread Chang Liu
Hi Till,

You mean use AssignerWithPeriodicWatermarks but combine the logic of Event time 
and Processing Time (or to say depend on the time difference/interval of 
Processing time, but the Watermark value is still depending on the Event time 
right)?

One additional question: where do I config this: 
ExecutionConfig.setAutoWatermarkInterval(…) ?

Many thanks :)

Best regards/祝好,

Chang Liu 刘畅


> On 25 Sep 2018, at 23:41, Till Rohrmann  wrote:
> 
> AssignerWithPeriodicWatermarks



Re: Could not find a suitable table factory for 'org.apache.flink.table.factories.DeserializationSchemaFactory' in the classpath.

2018-09-25 Thread Till Rohrmann
Hi,

I've pulled in Timo, who can help you with your problem.

Cheers,
Till

On Tue, Sep 25, 2018 at 12:02 PM clay  wrote:

> hi:
> I am using flink's table api, I receive data from kafka, then register it
> as
> a table, then I use sql statement to process, and finally convert the
> result
> back to a stream, write to a directory, the code looks like this:
>
> def main(args: Array[String]): Unit = {
>
> val sEnv = StreamExecutionEnvironment.getExecutionEnvironment
> sEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
>
> val tEnv = TableEnvironment.getTableEnvironment(sEnv)
>
>tEnv.connect(
>   new Kafka()
> .version("0.11")
> .topic("user")
> .startFromEarliest()
> .property("zookeeper.connect", "")
> .property("bootstrap.servers", "")
> )
>   .withFormat(
> new Json()
>   .failOnMissingField(false)
>   .deriveSchema()   //使用表的 schema
>   )
>   .withSchema(
> new Schema()
>   .field("username_skey", Types.STRING)
>   )
>   .inAppendMode()
>   .registerTableSource("user")
>  val userTest: Table = tEnv.sqlQuery(
>   """
>select ** form ** join **"".stripMargin)
> val endStream = tEnv.toRetractStream[Row](userTest)
> endStream.writeAsText("/tmp/sqlres",WriteMode.OVERWRITE)
> sEnv.execute("Test_New_Sign_Student")
>  }
>
> I was successful in the local test, but when I submit the following command
> in the cluster, I get the following error:
>
> ===
> org.apache.flink.client.program.ProgramInvocationException: The main method
> caused an error.
> at
>
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:546)
> at
>
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:421)
> at
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:426)
> at
>
> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:804)
> at
> org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:280)
> at
> org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:215)
> at
>
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1044)
> at
>
> org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1120)
> at java.security.AccessController.doPrivileged(Native Method)
> at javax.security.auth.Subject.doAs(Subject.java:422)
> at
>
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1692)
> at
>
> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
> at
> org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1120)
> Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException:
> Could
> not find a suitable table factory for
> 'org.apache.flink.table.factories.DeserializationSchemaFactory' in
> the classpath.
>
> Reason: No factory implements
> 'org.apache.flink.table.factories.DeserializationSchemaFactory'.
>
> The following properties are requested:
> connector.properties.0.key=zookeeper.connect
> 
> schema.9.name=roles
> schema.9.type=VARCHAR
> update-mode=append
>
> The following factories have been considered:
> org.apache.flink.table.sources.CsvBatchTableSourceFactory
> org.apache.flink.table.sources.CsvAppendTableSourceFactory
> org.apache.flink.table.sinks.CsvBatchTableSinkFactory
> org.apache.flink.table.sinks.CsvAppendTableSinkFactory
> org.apache.flink.streaming.connectors.kafka.Kafka011TableSourceSinkFactory
>
>
> at
>
> org.apache.flink.table.factories.TableFactoryService$.filterByFactoryClass(TableFactoryService.scala:176)
> at
>
> org.apache.flink.table.factories.TableFactoryService$.findInternal(TableFactoryService.scala:125)
> at
>
> org.apache.flink.table.factories.TableFactoryService$.find(TableFactoryService.scala:100)
> at
>
> org.apache.flink.table.factories.TableFactoryService.find(TableFactoryService.scala)
> at
>
> org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactoryBase.getDeserializationSchema(KafkaTableSourceSinkFactoryBase.java:259)
> at
>
> org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactoryBase.createStreamTableSource(KafkaTableSourceSinkFactoryBase.java:144)
> at
>
> org.apache.flink.table.factories.TableFactoryUtil$.findAndCreateTableSource(TableFactoryUtil.scala:50)
> at
>
> org.apache.flink.table.descriptors.ConnectTableDescriptor.registerTableSource(ConnectTableDescriptor.scala:44)
> at
> org.clay.test.Test_New_Sign_Student$.main(Test_New_Sign_Student.scala:64)
> at
> org.clay.test.Test_New_Sign_Student.main(Test_New_Sign_Student.scala)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>

Re: JobManager container is running beyond physical memory limits

2018-09-25 Thread Till Rohrmann
Hi,

what changed between version 1.4.2 and 1.5.2 was the addition of the
application level flow control mechanism which changed a bit how the
network buffers are configured. This could be a potential culprit.

Since you said that the container ran for some time, I'm wondering whether
there is somewhere a resource leak. In order to debug this, the heap dump
would be tremendously helpful.

Cheers,
Till

On Tue, Sep 25, 2018 at 11:27 AM eSKa  wrote:

> we dont set it up anywhere so i guess its default 16. Do you think its too
> much?
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


Re: Question about Window Tigger

2018-09-25 Thread Till Rohrmann
Hi Chang Liu,

maybe you could use the AssignerWithPeriodicWatermarks to build a custom
watermark assigner which creates the watermarks based on the incoming
events and if it detects that no events are coming, that it progresses the
watermark with respect to the wall clock time.

Cheers,
Till

On Tue, Sep 25, 2018 at 10:36 AM Chang Liu  wrote:

> Hi Rong,
>
> Thanks for your reply. :)
>
> Best regards/祝好,
>
> Chang Liu 刘畅
>
>
> On 19 Sep 2018, at 18:20, Rong Rong  wrote:
>
> Hi Chang,
>
> There were some previous discussion regarding how to debug watermark and
> window triggers[1].
> Basically if there's no data for some partitions there's no way to advance
> watermark. As it would not be able to determine whether this is due to
> network failure or actually there's no data arriving at the source.
> I think your use case is better of using SlidingProcessingTimeWindow.
>
> Thanks,
> Rong
>
> [1]
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/watermark-does-not-progress-td22315.html
>
> On Wed, Sep 19, 2018 at 1:48 AM Chang Liu  wrote:
>
>> Dear All,
>>
>> I have a question about the Window Trigger: let’s say i would like like
>> use the SlidingEventTimeWindow (60 seconds window size + 1 second window
>> shift) to count the number of records per window. And I am using Event Time
>> with periodic watermarking with certain maxOurOfOrderness time.
>>
>> Sometimes, what happens is: during certain time, there is no incoming
>> event, and then the watermark for triggering the window fire is not
>> coming.  Then, the last several records will be just stayed in the window.
>> It will fire only when the window sees the watermark to trigger.
>>
>> What I would like to achieve is: if there is no just watermark coming
>> within certain time (maybe this time is system clock time?), I can still
>> trigger the window to fire no matter whether there is new event coming or
>> not. Then I can still get the window count for this window, without waiting
>> the next event, which could be coming after a long time.
>>
>> Do you have any idea how can I do this? Many Thanks :)
>>
>> Best regards/祝好,
>>
>> Chang Liu 刘畅
>>
>>
>>
>


Re: "405 HTTP method POST is not supported by this URL" is returned when POST a REST url on Flink on yarn

2018-09-25 Thread Till Rohrmann
Hi Henry,

I think when running Flink on Yarn, then you must not go through the Yarn
proxy. Instead you should directly send the post request to the node on
which the application master runs. When starting a Flink Yarn session via
yarn-session.sh, then the web interface URL is printed to stdout, for
example.

Cheers,
Till

On Tue, Sep 25, 2018 at 9:43 AM 徐涛  wrote:

> Hi All,
> I am trying to POST a RESTful url and want to generate a savepoint, the
> Flink version is 1.6.0.
> When I executed the POST in local, everything is OK, but when I POST the
> url on a Flink on YARN application. The following error is returned:
> “405 HTTP method POST is not supported by this URL”, I guess it is caused
> by YARN limitation. (
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/REST-API-quot-broken-quot-on-YARN-because-POST-is-not-allowed-via-YARN-proxy-td19329.html
> )
> But does it have a workaround now?
>
> Best
> Henry
>


Re: Strange behavior of FsStateBackend checkpoint when local executing

2018-09-25 Thread Till Rohrmann
Hi Henry,

which version of Flink are you using. If you could us provide with a
working example to reproduce the problem, then I'm sure that we can figure
out why it is not working as expected.

Cheers,
Till

On Tue, Sep 25, 2018 at 8:44 AM 徐涛  wrote:

> Hi All,
> I use using a FsStateBackend in local executing, I set the
> DELETE_ON_CANCELLATION of checkpoint. When I click the “stop” button in
> Intellij IDEA, the log shows that it has been switched CANCELED state, but
> I check the local file system, the checkpoint directory and file still
> exists.
> And further more, I rm the checkpoint directory, but when I
> execute the main function, the flink program run again, it seems that it
> still can read the old data(the dirty data that can lead to exception in
> Kafka), but in my understood, because there is no checkpoint now, it by
> default should read the latest kafka data. So why does it happens?
> Thanks a lot!
>
> Best
> Henry


Re: Flink 1.5.4 -- issues w/ TaskManager connecting to ResourceManager

2018-09-25 Thread Till Rohrmann
Hi Jamie,

thanks for the update on how to fix the problem. This is very helpful for
the rest of the community.

The change of removing the execution mode parameter (FLINK-8696) from the
start up scripts was actually released with Flink 1.5.0. That way, the host
name became the 2nd parameter. By calling the start up scripts with the old
syntax, the execution mode parameter was interpreted as the hostname. This
host name option was, however, not properly evaluated until we fixed it
with Flink 1.5.4. Therefore, the problem only surfaced now.

We definitely need to treat the start up scripts as a stable API as well.
So far, we don't have good tooling which ensures that we don't introduce
breaking changes. In the future we need to be more careful!

Cheers,
Till

On Tue, Sep 25, 2018 at 8:54 PM Jamie Grier  wrote:

> Update on this:
>
> The issue was the command being used to start the jobmanager:
> `jobmanager.sh start-foreground cluster`.  This was a command leftover in
> our automation that used to be the correct way to start the JM -- however
> now, in Flink 1.5.4, that second parameter, `cluster`, is being interpreted
> as the hostname for the jobmanager to bind to.
>
> The solution was just to remove `cluster` from that command.
>
>
>
> On Tue, Sep 25, 2018 at 10:15 AM Jamie Grier  wrote:
>
>> Anybody else seen this and know the solution?  We're dead in the water
>> with Flink 1.5.4.
>>
>> On Sun, Sep 23, 2018 at 11:46 PM alex  wrote:
>>
>>> We started to see same errors after upgrading to flink 1.6.0 from 1.4.2.
>>> We
>>> have one JM and 5 TM on kubernetes. JM is running on HA mode.
>>> Taskmanagers
>>> sometimes are loosing connection to JM and having following error like
>>> you
>>> have.
>>>
>>> *2018-09-19 12:36:40,687 INFO
>>> org.apache.flink.runtime.taskexecutor.TaskExecutor- Could not
>>> resolve ResourceManager address
>>> akka.tcp://flink@flink-jobmanager:50002/user/resourcemanager, retrying
>>> in
>>> 1 ms: Ask timed out on
>>> [ActorSelection[Anchor(akka.tcp://flink@flink-jobmanager:50002/),
>>> Path(/user/resourcemanager)]] after [1 ms]. Sender[null] sent
>>> message of
>>> type "akka.actor.Identify"..*
>>>
>>> When TM started to have "Could not resolve ResourceManager", it cannot
>>> resolve itself until I restart the TM pod.
>>>
>>> *Here is the content of our flink-conf.yaml:*
>>> blob.server.port: 6124
>>> jobmanager.rpc.address: flink-jobmanager
>>> jobmanager.rpc.port: 6123
>>> jobmanager.heap.mb: 4096
>>> jobmanager.web.history: 20
>>> jobmanager.archive.fs.dir: s3://our_path
>>> taskmanager.rpc.port: 6121
>>> taskmanager.heap.mb: 16384
>>> taskmanager.numberOfTaskSlots: 10
>>> taskmanager.log.path: /opt/flink/log/output.log
>>> web.log.path: /opt/flink/log/output.log
>>> state.checkpoints.num-retained: 3
>>> metrics.reporters: prom
>>> metrics.reporter.prom.class:
>>> org.apache.flink.metrics.prometheus.PrometheusReporter
>>>
>>> high-availability: zookeeper
>>> high-availability.jobmanager.port: 50002
>>> high-availability.zookeeper.quorum: zookeeper_instance_list
>>> high-availability.zookeeper.path.root: /flink
>>> high-availability.cluster-id: profileservice
>>> high-availability.storageDir: s3://our_path
>>>
>>> Any help will be greatly appreciated!
>>>
>>>
>>>
>>> --
>>> Sent from:
>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>>>
>>


Re: Flink 1.5.4 -- issues w/ TaskManager connecting to ResourceManager

2018-09-25 Thread Jamie Grier
Update on this:

The issue was the command being used to start the jobmanager:
`jobmanager.sh start-foreground cluster`.  This was a command leftover in
our automation that used to be the correct way to start the JM -- however
now, in Flink 1.5.4, that second parameter, `cluster`, is being interpreted
as the hostname for the jobmanager to bind to.

The solution was just to remove `cluster` from that command.



On Tue, Sep 25, 2018 at 10:15 AM Jamie Grier  wrote:

> Anybody else seen this and know the solution?  We're dead in the water
> with Flink 1.5.4.
>
> On Sun, Sep 23, 2018 at 11:46 PM alex  wrote:
>
>> We started to see same errors after upgrading to flink 1.6.0 from 1.4.2.
>> We
>> have one JM and 5 TM on kubernetes. JM is running on HA mode. Taskmanagers
>> sometimes are loosing connection to JM and having following error like you
>> have.
>>
>> *2018-09-19 12:36:40,687 INFO
>> org.apache.flink.runtime.taskexecutor.TaskExecutor- Could not
>> resolve ResourceManager address
>> akka.tcp://flink@flink-jobmanager:50002/user/resourcemanager, retrying in
>> 1 ms: Ask timed out on
>> [ActorSelection[Anchor(akka.tcp://flink@flink-jobmanager:50002/),
>> Path(/user/resourcemanager)]] after [1 ms]. Sender[null] sent message
>> of
>> type "akka.actor.Identify"..*
>>
>> When TM started to have "Could not resolve ResourceManager", it cannot
>> resolve itself until I restart the TM pod.
>>
>> *Here is the content of our flink-conf.yaml:*
>> blob.server.port: 6124
>> jobmanager.rpc.address: flink-jobmanager
>> jobmanager.rpc.port: 6123
>> jobmanager.heap.mb: 4096
>> jobmanager.web.history: 20
>> jobmanager.archive.fs.dir: s3://our_path
>> taskmanager.rpc.port: 6121
>> taskmanager.heap.mb: 16384
>> taskmanager.numberOfTaskSlots: 10
>> taskmanager.log.path: /opt/flink/log/output.log
>> web.log.path: /opt/flink/log/output.log
>> state.checkpoints.num-retained: 3
>> metrics.reporters: prom
>> metrics.reporter.prom.class:
>> org.apache.flink.metrics.prometheus.PrometheusReporter
>>
>> high-availability: zookeeper
>> high-availability.jobmanager.port: 50002
>> high-availability.zookeeper.quorum: zookeeper_instance_list
>> high-availability.zookeeper.path.root: /flink
>> high-availability.cluster-id: profileservice
>> high-availability.storageDir: s3://our_path
>>
>> Any help will be greatly appreciated!
>>
>>
>>
>> --
>> Sent from:
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>>
>


Re: Flink 1.5.4 -- issues w/ TaskManager connecting to ResourceManager

2018-09-25 Thread Jamie Grier
Anybody else seen this and know the solution?  We're dead in the water with
Flink 1.5.4.

On Sun, Sep 23, 2018 at 11:46 PM alex  wrote:

> We started to see same errors after upgrading to flink 1.6.0 from 1.4.2. We
> have one JM and 5 TM on kubernetes. JM is running on HA mode. Taskmanagers
> sometimes are loosing connection to JM and having following error like you
> have.
>
> *2018-09-19 12:36:40,687 INFO
> org.apache.flink.runtime.taskexecutor.TaskExecutor- Could not
> resolve ResourceManager address
> akka.tcp://flink@flink-jobmanager:50002/user/resourcemanager, retrying in
> 1 ms: Ask timed out on
> [ActorSelection[Anchor(akka.tcp://flink@flink-jobmanager:50002/),
> Path(/user/resourcemanager)]] after [1 ms]. Sender[null] sent message
> of
> type "akka.actor.Identify"..*
>
> When TM started to have "Could not resolve ResourceManager", it cannot
> resolve itself until I restart the TM pod.
>
> *Here is the content of our flink-conf.yaml:*
> blob.server.port: 6124
> jobmanager.rpc.address: flink-jobmanager
> jobmanager.rpc.port: 6123
> jobmanager.heap.mb: 4096
> jobmanager.web.history: 20
> jobmanager.archive.fs.dir: s3://our_path
> taskmanager.rpc.port: 6121
> taskmanager.heap.mb: 16384
> taskmanager.numberOfTaskSlots: 10
> taskmanager.log.path: /opt/flink/log/output.log
> web.log.path: /opt/flink/log/output.log
> state.checkpoints.num-retained: 3
> metrics.reporters: prom
> metrics.reporter.prom.class:
> org.apache.flink.metrics.prometheus.PrometheusReporter
>
> high-availability: zookeeper
> high-availability.jobmanager.port: 50002
> high-availability.zookeeper.quorum: zookeeper_instance_list
> high-availability.zookeeper.path.root: /flink
> high-availability.cluster-id: profileservice
> high-availability.storageDir: s3://our_path
>
> Any help will be greatly appreciated!
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


Re: Rocksdb Metrics

2018-09-25 Thread Stefan Richter
Hi,

this feature is tracked here https://issues.apache.org/jira/browse/FLINK-10423 


Best,
Stefan

> Am 25.09.2018 um 17:51 schrieb Sayat Satybaldiyev :
> 
> Flink provides a rich number of metrics. However, I didn't find any metrics 
> for rocksdb state backend not in metrics doc nor in JMX Mbean. 
> 
> Is there are any metrics for the rocksdb backend that Flink exposes?



Re: How to join stream and batch data in Flink?

2018-09-25 Thread Hequn Cheng
Hi vino,

Thanks for sharing the link. It's a great book and I will take a look.
There are kinds of join. Different joins have different semantics. From the
link, I think it means the time versioned join.  FLINK-9712
 enrichments joins with
Time Versioned Functions and the result is deterministic under eventime.

Best, Hequn

On Tue, Sep 25, 2018 at 11:05 PM vino yang  wrote:

> Hi Hequn,
>
> The specific content of the book does not give a right or wrong
> conclusion, but it illustrates this phenomenon: two streams of the same
> input, playing and joining at the same time, due to the order of events,
> the connection results are uncertain. This is because the two streams are
> intertwined in different forms. This has nothing to do with orderby, just
> that it exists in the stream stream join. Of course, this phenomenon is
> only a comparison statement with a non-stream join.
>
> In addition, I recommend this book, which is very famous on Twitter and
> Amazon. Because you are also Chinese, there is a good translation here. If
> I guess it is correct, the main translator is also from your company. This
> part of what I mentioned is here.[1]
>
> [1]:
> https://github.com/Vonng/ddia/blob/master/ch11.md#%E8%BF%9E%E6%8E%A5%E7%9A%84%E6%97%B6%E9%97%B4%E4%BE%9D%E8%B5%96%E6%80%A7
>
> Thanks, vino.
>
> Hequn Cheng  于2018年9月25日周二 下午9:45写道:
>
>> Hi vino,
>>
>> There are no order problems of stream-stream join in Flink. No matter
>> what order the elements come, stream-stream join in Flink will output
>> results which consistent with standard SQL semantics. I haven't read the
>> book you mentioned. For join, it doesn't guarantee output orders. You have
>> to do orderBy if you want to get ordered results.
>>
>> Best, Hequn
>>
>> On Tue, Sep 25, 2018 at 8:36 PM vino yang  wrote:
>>
>>> Hi Fabian,
>>>
>>> I may not have stated it here, and there is no semantic problem at the
>>> Flink implementation level. Rather, there may be “Time-dependence” here. [1]
>>>
>>> Yes, my initial answer was not to use this form of join in this
>>> scenario, but Henry said he converted the table into a stream table and
>>> asked about the feasibility of other methods.
>>>
>>> [1]: 《Designing Data-Intensive Applications》By Martin Kleppmann, Part 3:
>>> Derived Data, Chapter 11: Stream Processing , Stream Joins.
>>>
>>> some content :
>>>
>>> *If the ordering of events across streams is undetermined, the join
>>> becomes nondeter‐ ministic [87], which means you cannot rerun the same job
>>> on the same input and necessarily get the same result: the events on the
>>> input streams may be interleaved in a different way when you run the job
>>> again. *
>>>
>>>
>>> Fabian Hueske  于2018年9月25日周二 下午8:08写道:
>>>
 Hi,

 I don't think that using the current join implementation in the Table
 API / SQL will work.
 The non-windowed join fully materializes *both* input tables in state.
 This is necessary, because the join needs to be able to process updates on
 either side.
 While this is not a problem for the fixed sized MySQL table,
 materializing the append-only table (aka stream) is probably not what you
 want.
 You can also not limit idle state retention because it would remove the
 MySQL table from state at some point.

 The only way to make it work is using a user-defined TableFunction that
 queries the MySQL table via JDBC.
 However, please note that these calls would be synchronous, blocking
 calls.

 @Vino: Why do you think that the stream & stream join is not mature and
 which problems do you see in the semantics?
 The semantics are correct (standard SQL semantics) and in my opinion
 the implementation is also mature.
 However, you should not use the non-windowed join if any of the input
 tables is ever growing because both sides must be hold in state. This is
 not an issue of the semantics.

 Cheers,
 Fabian

 Am Di., 25. Sep. 2018 um 14:00 Uhr schrieb vino yang <
 yanghua1...@gmail.com>:

> Hi Henry,
>
> 1) I don't recommend this method very much, but you said that you
> expect to convert mysql table to stream and then to flink table. Under 
> this
> premise, I said that you can do this by joining two stream tables. But as
> you know, this join depends on the time period in which the state is 
> saved.
> To make it equivalent to a dimension table, you must permanently save the
> state of the stream table that is defined as a "dimension table." I just
> said that modifying the relevant configuration in Flink can do this, Not
> for a single table.
>
> 2) Imagine that there are one million records in two tables. The
> records in both tables are just beginning to stream into flink, and the
> records as dimension tables are not fully arrived. Therefore, your 
> matching
> results may not be as accurate a

Rocksdb Metrics

2018-09-25 Thread Sayat Satybaldiyev
Flink provides a rich number of metrics. However, I didn't find any metrics
for rocksdb state backend not in metrics doc nor in JMX Mbean.

Is there are any metrics for the rocksdb backend that Flink exposes?


Re: ArrayIndexOutOfBoundsException

2018-09-25 Thread Alexander Smirnov
Appreciate your help, Stefan! 👍🏻
On Tue, 25 Sep 2018 at 18:19, Stefan Richter 
wrote:

> You only need to update the flink jars, the job requires no update. I
> think you also cannot start from this checkpoint/savepoint after the
> upgrade because it seems to be corrupted from the bug. You need to us an
> older point to restart.
>
> Best,
> Stefan
>
>
> Am 25.09.2018 um 16:53 schrieb Alexander Smirnov <
> alexander.smirn...@gmail.com>:
>
> Thanks Stefan.
>
> is it only Flink runtime should be updated, or the job should be
> recompiled too?
> Is there a workaround to start the job without upgrading Flink?
>
> Alex
>
> On Tue, Sep 25, 2018 at 5:48 PM Stefan Richter <
> s.rich...@data-artisans.com> wrote:
>
>> Hi,
>>
>> this problem looks like https://issues.apache.org/jira/browse/FLINK-8836 
>> which
>> would also match to your Flink version. I suggest to update to 1.4.3 or
>> higher to avoid the issue in the future.
>>
>> Best,
>> Stefan
>>
>>
>> Am 25.09.2018 um 16:37 schrieb Alexander Smirnov <
>> alexander.smirn...@gmail.com>:
>>
>> I'm getting an exception on job starting from a savepoint. Why that could
>> happen?
>>
>> Flink 1.4.2
>>
>>
>> java.lang.IllegalStateException: Could not initialize operator state
>> backend.
>> at
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initOperatorState(AbstractStreamOperator.java:301)
>> at
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:248)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:692)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:679)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253)
>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
>> at java.lang.Thread.run(Thread.java:748)
>> Caused by: java.lang.ArrayIndexOutOfBoundsException: -2
>> at java.util.ArrayList.elementData(ArrayList.java:418)
>> at java.util.ArrayList.get(ArrayList.java:431)
>> at
>> com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapReferenceResolver.java:42)
>> at
>> com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:805)
>> at
>> com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:759)
>> at
>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:249)
>> at
>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:136)
>> at
>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)
>> at
>> org.apache.flink.runtime.state.DefaultOperatorStateBackend.deserializeStateValues(DefaultOperatorStateBackend.java:584)
>> at
>> org.apache.flink.runtime.state.DefaultOperatorStateBackend.restore(DefaultOperatorStateBackend.java:399)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.createOperatorStateBackend(StreamTask.java:733)
>> at
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initOperatorState(AbstractStreamOperator.java:299)
>> ... 6 more
>>
>>
>>
>


Re: 1.5 Checkpoint metadata location

2018-09-25 Thread Gyula Fóra
Yes, the only workaround I found at the end was to restore the previous
behavior where metadata files are written separately.

But for this you need a custom Flink build with the changes to the check
pointing logic.

Gyula
On Tue, 25 Sep 2018 at 16:45, Till Rohrmann  wrote:

> Hi Bryant,
>
> I think if you explicitly define the StateBackend in your code (calling
> StreamExecutionEnvironment#setStateBackend), then you also define the
> checkpointing directory when calling the StateBackend's constructor. This
> is also the directory in which the metadata files are stored. You could
> also say that the job specific settings overwrite the cluster settings
> specified in the flink-conf.yaml.
>
> I think the workaround was to change some Flink code and build a custom
> version. For more details, you have to ask Gyula.
>
> Cheers,
> Till
>
> On Tue, Sep 25, 2018 at 4:59 AM vino yang  wrote:
>
>> Hi Bryant,
>>
>> Maybe Stefan can answer your question, ping him for you.
>>
>> Thanks, vino.
>>
>> Bryant Baltes  于2018年9月25日周二 上午12:29写道:
>>
>>> Hi All,
>>>
>>> After upgrading from 1.3.2 to 1.5.2, one of our apps that uses
>>> checkpointing no longer writes metadata files to the state.checkpoints.dir
>>> location provided to the flink conf.  I see this email chain addressed this
>>> here:
>>> https://lists.apache.org/thread.html/922f77880eca2a7b279e153090da2388b54f19e89528a2a35937d9a8@%3Cuser.flink.apache.org%3E
>>> .
>>>
>>> I am still a bit unclear what the workaround is.  We use the metadata
>>> files to recover when the app goes down or gets restarted.
>>>
>>> Thanks,
>>>
>>> Bryant
>>>
>>


Re: ArrayIndexOutOfBoundsException

2018-09-25 Thread Stefan Richter
You only need to update the flink jars, the job requires no update. I think you 
also cannot start from this checkpoint/savepoint after the upgrade because it 
seems to be corrupted from the bug. You need to us an older point to restart.

Best,
Stefan

> Am 25.09.2018 um 16:53 schrieb Alexander Smirnov 
> :
> 
> Thanks Stefan.
> 
> is it only Flink runtime should be updated, or the job should be recompiled 
> too?
> Is there a workaround to start the job without upgrading Flink?
> 
> Alex
> 
> On Tue, Sep 25, 2018 at 5:48 PM Stefan Richter  > wrote:
> Hi,
> 
> this problem looks like https://issues.apache.org/jira/browse/FLINK-8836 
>  which would also match to 
> your Flink version. I suggest to update to 1.4.3 or higher to avoid the issue 
> in the future.
> 
> Best,
> Stefan
> 
> 
>> Am 25.09.2018 um 16:37 schrieb Alexander Smirnov 
>> mailto:alexander.smirn...@gmail.com>>:
>> 
>> I'm getting an exception on job starting from a savepoint. Why that could 
>> happen?
>> 
>> Flink 1.4.2
>> 
>> 
>> java.lang.IllegalStateException: Could not initialize operator state backend.
>> at 
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initOperatorState(AbstractStreamOperator.java:301)
>> at 
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:248)
>> at 
>> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:692)
>> at 
>> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:679)
>> at 
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253)
>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
>> at java.lang.Thread.run(Thread.java:748)
>> Caused by: java.lang.ArrayIndexOutOfBoundsException: -2
>> at java.util.ArrayList.elementData(ArrayList.java:418)
>> at java.util.ArrayList.get(ArrayList.java:431)
>> at 
>> com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapReferenceResolver.java:42)
>> at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:805)
>> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:759)
>> at 
>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:249)
>> at 
>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:136)
>> at 
>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)
>> at 
>> org.apache.flink.runtime.state.DefaultOperatorStateBackend.deserializeStateValues(DefaultOperatorStateBackend.java:584)
>> at 
>> org.apache.flink.runtime.state.DefaultOperatorStateBackend.restore(DefaultOperatorStateBackend.java:399)
>> at 
>> org.apache.flink.streaming.runtime.tasks.StreamTask.createOperatorStateBackend(StreamTask.java:733)
>> at 
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initOperatorState(AbstractStreamOperator.java:299)
>> ... 6 more
> 



Re: How to join stream and batch data in Flink?

2018-09-25 Thread vino yang
Hi Hequn,

The specific content of the book does not give a right or wrong conclusion,
but it illustrates this phenomenon: two streams of the same input, playing
and joining at the same time, due to the order of events, the connection
results are uncertain. This is because the two streams are intertwined in
different forms. This has nothing to do with orderby, just that it exists
in the stream stream join. Of course, this phenomenon is only a comparison
statement with a non-stream join.

In addition, I recommend this book, which is very famous on Twitter and
Amazon. Because you are also Chinese, there is a good translation here. If
I guess it is correct, the main translator is also from your company. This
part of what I mentioned is here.[1]

[1]:
https://github.com/Vonng/ddia/blob/master/ch11.md#%E8%BF%9E%E6%8E%A5%E7%9A%84%E6%97%B6%E9%97%B4%E4%BE%9D%E8%B5%96%E6%80%A7

Thanks, vino.

Hequn Cheng  于2018年9月25日周二 下午9:45写道:

> Hi vino,
>
> There are no order problems of stream-stream join in Flink. No matter what
> order the elements come, stream-stream join in Flink will output results
> which consistent with standard SQL semantics. I haven't read the book you
> mentioned. For join, it doesn't guarantee output orders. You have to do
> orderBy if you want to get ordered results.
>
> Best, Hequn
>
> On Tue, Sep 25, 2018 at 8:36 PM vino yang  wrote:
>
>> Hi Fabian,
>>
>> I may not have stated it here, and there is no semantic problem at the
>> Flink implementation level. Rather, there may be “Time-dependence” here. [1]
>>
>> Yes, my initial answer was not to use this form of join in this scenario,
>> but Henry said he converted the table into a stream table and asked about
>> the feasibility of other methods.
>>
>> [1]: 《Designing Data-Intensive Applications》By Martin Kleppmann, Part 3:
>> Derived Data, Chapter 11: Stream Processing , Stream Joins.
>>
>> some content :
>>
>> *If the ordering of events across streams is undetermined, the join
>> becomes nondeter‐ ministic [87], which means you cannot rerun the same job
>> on the same input and necessarily get the same result: the events on the
>> input streams may be interleaved in a different way when you run the job
>> again. *
>>
>>
>> Fabian Hueske  于2018年9月25日周二 下午8:08写道:
>>
>>> Hi,
>>>
>>> I don't think that using the current join implementation in the Table
>>> API / SQL will work.
>>> The non-windowed join fully materializes *both* input tables in state.
>>> This is necessary, because the join needs to be able to process updates on
>>> either side.
>>> While this is not a problem for the fixed sized MySQL table,
>>> materializing the append-only table (aka stream) is probably not what you
>>> want.
>>> You can also not limit idle state retention because it would remove the
>>> MySQL table from state at some point.
>>>
>>> The only way to make it work is using a user-defined TableFunction that
>>> queries the MySQL table via JDBC.
>>> However, please note that these calls would be synchronous, blocking
>>> calls.
>>>
>>> @Vino: Why do you think that the stream & stream join is not mature and
>>> which problems do you see in the semantics?
>>> The semantics are correct (standard SQL semantics) and in my opinion the
>>> implementation is also mature.
>>> However, you should not use the non-windowed join if any of the input
>>> tables is ever growing because both sides must be hold in state. This is
>>> not an issue of the semantics.
>>>
>>> Cheers,
>>> Fabian
>>>
>>> Am Di., 25. Sep. 2018 um 14:00 Uhr schrieb vino yang <
>>> yanghua1...@gmail.com>:
>>>
 Hi Henry,

 1) I don't recommend this method very much, but you said that you
 expect to convert mysql table to stream and then to flink table. Under this
 premise, I said that you can do this by joining two stream tables. But as
 you know, this join depends on the time period in which the state is saved.
 To make it equivalent to a dimension table, you must permanently save the
 state of the stream table that is defined as a "dimension table." I just
 said that modifying the relevant configuration in Flink can do this, Not
 for a single table.

 2) Imagine that there are one million records in two tables. The
 records in both tables are just beginning to stream into flink, and the
 records as dimension tables are not fully arrived. Therefore, your matching
 results may not be as accurate as directly querying Mysql.

 In fact, the current stream & stream join is not very mature, there are
 some problems in semantics, I personally recommend that you return to
 stream/batch (mysql) join. For more principle content, I recommend you read
 a book, referred to as 《DDIA》.

 Thanks, vino.

 徐涛  于2018年9月25日周二 下午5:48写道:

> Hi Vino,
> I do not quite understand in some sentences below, would you please
> help explain it a bit more detailedly?
> 1. “*such as setting the state retention time of one of the tab

Re: ArrayIndexOutOfBoundsException

2018-09-25 Thread Alexander Smirnov
Thanks Stefan.

is it only Flink runtime should be updated, or the job should be recompiled
too?
Is there a workaround to start the job without upgrading Flink?

Alex

On Tue, Sep 25, 2018 at 5:48 PM Stefan Richter 
wrote:

> Hi,
>
> this problem looks like https://issues.apache.org/jira/browse/FLINK-8836 which
> would also match to your Flink version. I suggest to update to 1.4.3 or
> higher to avoid the issue in the future.
>
> Best,
> Stefan
>
>
> Am 25.09.2018 um 16:37 schrieb Alexander Smirnov <
> alexander.smirn...@gmail.com>:
>
> I'm getting an exception on job starting from a savepoint. Why that could
> happen?
>
> Flink 1.4.2
>
>
> java.lang.IllegalStateException: Could not initialize operator state
> backend.
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initOperatorState(AbstractStreamOperator.java:301)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:248)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:692)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:679)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.ArrayIndexOutOfBoundsException: -2
> at java.util.ArrayList.elementData(ArrayList.java:418)
> at java.util.ArrayList.get(ArrayList.java:431)
> at
> com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapReferenceResolver.java:42)
> at
> com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:805)
> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:759)
> at
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:249)
> at
> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:136)
> at
> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)
> at
> org.apache.flink.runtime.state.DefaultOperatorStateBackend.deserializeStateValues(DefaultOperatorStateBackend.java:584)
> at
> org.apache.flink.runtime.state.DefaultOperatorStateBackend.restore(DefaultOperatorStateBackend.java:399)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.createOperatorStateBackend(StreamTask.java:733)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initOperatorState(AbstractStreamOperator.java:299)
> ... 6 more
>
>
>


Re: ArrayIndexOutOfBoundsException

2018-09-25 Thread Stefan Richter
Hi,

this problem looks like https://issues.apache.org/jira/browse/FLINK-8836 
 which would also match to 
your Flink version. I suggest to update to 1.4.3 or higher to avoid the issue 
in the future.

Best,
Stefan

> Am 25.09.2018 um 16:37 schrieb Alexander Smirnov 
> :
> 
> I'm getting an exception on job starting from a savepoint. Why that could 
> happen?
> 
> Flink 1.4.2
> 
> 
> java.lang.IllegalStateException: Could not initialize operator state backend.
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initOperatorState(AbstractStreamOperator.java:301)
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:248)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:692)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:679)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.ArrayIndexOutOfBoundsException: -2
> at java.util.ArrayList.elementData(ArrayList.java:418)
> at java.util.ArrayList.get(ArrayList.java:431)
> at 
> com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapReferenceResolver.java:42)
> at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:805)
> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:759)
> at 
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:249)
> at 
> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:136)
> at 
> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)
> at 
> org.apache.flink.runtime.state.DefaultOperatorStateBackend.deserializeStateValues(DefaultOperatorStateBackend.java:584)
> at 
> org.apache.flink.runtime.state.DefaultOperatorStateBackend.restore(DefaultOperatorStateBackend.java:399)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.createOperatorStateBackend(StreamTask.java:733)
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initOperatorState(AbstractStreamOperator.java:299)
> ... 6 more



Re: Scheduling sources

2018-09-25 Thread Till Rohrmann
Hi Averell,

such a feature is currently not supported by Flink. The scheduling works by
starting all sources at the same time. Depending whether it is a batch or
streaming job, you either start deploying consumers once producers have
produced some results or right away.

Cheers,
Till

On Tue, Sep 25, 2018 at 8:16 AM Averell  wrote:

> Hi everyone,
>
> I have 2 file sources, which I want to start reading them in a specified
> order (e.g: source2 should only start 5 minutes after source1 has started).
> I could not find any Flink document mentioning this capability, and I also
> tried to search the mailing list, without any success.
> However, on Flink GUI there's a Timeline tab which shows start-time of each
> operator. And this gives me a hope that there is something that can help
> with my requirement.
> (
> http://localhost:20888/proxy/application_1537700592704_0026/#/jobs/0360094da093e36299273329f9dec19d/timeline
> )
>
> Could you please help give some help?
>
> Thanks and best regards,
> Averell
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


Re: 1.5 Checkpoint metadata location

2018-09-25 Thread Till Rohrmann
Hi Bryant,

I think if you explicitly define the StateBackend in your code (calling
StreamExecutionEnvironment#setStateBackend), then you also define the
checkpointing directory when calling the StateBackend's constructor. This
is also the directory in which the metadata files are stored. You could
also say that the job specific settings overwrite the cluster settings
specified in the flink-conf.yaml.

I think the workaround was to change some Flink code and build a custom
version. For more details, you have to ask Gyula.

Cheers,
Till

On Tue, Sep 25, 2018 at 4:59 AM vino yang  wrote:

> Hi Bryant,
>
> Maybe Stefan can answer your question, ping him for you.
>
> Thanks, vino.
>
> Bryant Baltes  于2018年9月25日周二 上午12:29写道:
>
>> Hi All,
>>
>> After upgrading from 1.3.2 to 1.5.2, one of our apps that uses
>> checkpointing no longer writes metadata files to the state.checkpoints.dir
>> location provided to the flink conf.  I see this email chain addressed this
>> here:
>> https://lists.apache.org/thread.html/922f77880eca2a7b279e153090da2388b54f19e89528a2a35937d9a8@%3Cuser.flink.apache.org%3E
>> .
>>
>> I am still a bit unclear what the workaround is.  We use the metadata
>> files to recover when the app goes down or gets restarted.
>>
>> Thanks,
>>
>> Bryant
>>
>


ArrayIndexOutOfBoundsException

2018-09-25 Thread Alexander Smirnov
I'm getting an exception on job starting from a savepoint. Why that could
happen?

Flink 1.4.2


java.lang.IllegalStateException: Could not initialize operator state
backend.
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initOperatorState(AbstractStreamOperator.java:301)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:248)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:692)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:679)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ArrayIndexOutOfBoundsException: -2
at java.util.ArrayList.elementData(ArrayList.java:418)
at java.util.ArrayList.get(ArrayList.java:431)
at
com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapReferenceResolver.java:42)
at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:805)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:759)
at
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:249)
at
org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:136)
at
org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)
at
org.apache.flink.runtime.state.DefaultOperatorStateBackend.deserializeStateValues(DefaultOperatorStateBackend.java:584)
at
org.apache.flink.runtime.state.DefaultOperatorStateBackend.restore(DefaultOperatorStateBackend.java:399)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.createOperatorStateBackend(StreamTask.java:733)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initOperatorState(AbstractStreamOperator.java:299)
... 6 more


Re: Flink TaskManagers do not start until job is submitted in YARN

2018-09-25 Thread Till Rohrmann
With Flink 1.5.x and 1.6.x you can put `mode: legacy` into your
flink-conf.yaml and it will start the old mode. Then you have the old
behaviour.

What do you mean with total slots? The current number of total slots? With
resource elasticity this number can of course change because if you don't
have enough slots, then Flink will try to start a new TaskExecutor.

Cheers,
Till

On Mon, Sep 24, 2018 at 7:11 PM suraj7  wrote:

> Thanks for the clarification, Dawid and Till.
>
> @Till We have a few streaming jobs that need to be running all the time and
> we plan on using the modify tool to update parallelism of jobs as we scale
> the cluster in and out and knowing total slots value is crucial to this
> workflow.
>
> As Dawid pointed out, is there a switch to restore the old behavior?
> If not, is there a way to find/predict total slots value from YARN metrics?
> Are you aware of any such workflow?
>
> Thanks,
> Suraj
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


Re: Information required regarding SSL algorithms for Flink 1.5.x

2018-09-25 Thread Till Rohrmann
Flink 1.5.x supports the same set of algorithms as does Flink 1.6. However,
it mainly depends on the used Java version which algorithms you can use.
There are certain Java versions which don't support all cipher suites. See
https://issues.apache.org/jira/browse/FLINK-9424 for example.

Cheers,
Till

On Mon, Sep 24, 2018 at 12:29 PM V N, Suchithra (Nokia - IN/Bangalore) <
suchithra@nokia.com> wrote:

> Hello,
>
>
>
> We have a query regarding SSL algorithms available for Flink versions.
> From the documents of Flink 1.6.0 we could see following SSL algorithms
> options are supported.
>
>
>
> security.ssl.algorithms:
> TLS_DHE_RSA_WITH_AES_128_GCM_SHA256,TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256,TLS_DHE_RSA_WITH_AES_256_GCM_SHA384,TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384
>
> Please let us know if all these options are supported in Flink 1.5.x
> releases as well.
>
>
>
> Thanks,
>
> Suchithra
>
>
>
>
>


Re: Flink - Process datastream in a bounded context (like Dataset) - Unifying stream & batch

2018-09-25 Thread bastien dine
Hi Hequn,

Thanks for your response
Yea I know about the table API, but I am searching a way to have a bounded
context with a stream, somehow create a dataset from a buffer store in a
window of datastream

Regards, Bastien

Le mar. 25 sept. 2018 à 14:50, Hequn Cheng  a écrit :

> Hi bastien,
>
> Flink features two relational APIs, the Table API and SQL. Both APIs are
> unified APIs for batch and stream processing, i.e., queries are executed
> with the same semantics on unbounded, real-time streams or bounded[1].
> There are also documents about Join[2].
>
> Best, Hequn
> [1] https://flink.apache.org/flink-applications.html#layered-apis
> [2]
> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql.html#joins
>
> On Tue, Sep 25, 2018 at 4:14 PM bastien dine 
> wrote:
>
>> Hello everyone,
>>
>> I need to join some files to perform some processing.. The dataset API is
>> a perfect way to achieve this, I am able to do it when I read file in batch
>> (csv)
>>
>> However in the prod environment, I will receive thoses files in kafka
>> messages (one message = one line of a file)
>> So I am considering using a global window + a custom trigger on a end of
>> file message and a process window function.
>> But I can not go too far with that as process is only one function and
>> chaining functions will be a pain. I don't think that emitting a datastream
>> & windows / trigger on EOF before every process function is a good idea
>>
>> However I would like to work in a bounded way once I received all of my
>> elements (after the trigger on global window), like the dataset API, as I
>> will join on my whole dataset..
>>
>> I thought maybe it would be a good idea to go for table API and group
>> window ? but you can not have custom trigger and a global group window on a
>> table ?(like the global window on datastream ?)
>> Best alternative would be to create a dataset as a result of my process
>> window function.. but I don't think this is possible, is it ?
>>
>> Best Regards,
>> Bastien
>>
>


Re: Get last element of a DataSe

2018-09-25 Thread Alejandro Alcalde
Yes, of course

A IDA Discretization
[info]   When computing its discretization
[info]   - Should be computed correctly *** FAILED ***
[info] org.apache.flink.runtime.client.JobExecutionException:
java.lang.Exception: The user defined 'open()' method caused an exce
ption: Index: 0, Size:
0

[info] at
org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:623)
[info] at
org.apache.flink.client.LocalExecutor.executePlan(LocalExecutor.java:235)
[info] at
org.apache.flink.api.java.LocalEnvironment.execute(LocalEnvironment.java:91)
[info] at
org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:816)
[info] at
org.apache.flink.api.scala.ExecutionEnvironment.execute(ExecutionEnvironment.scala:525)
[info] at org.apache.flink.api.scala.DataSet.collect(DataSet.scala:601)
[info] at
com.elbauldelprogramador.discretizers.IDADiscretizerTransformerSpec$$anonfun$1$$anonfun$apply$mcV$sp$1$$anonfun$apply$mc
V$sp$2.apply(IDADiscretizerTransformerSpec.scala:56)

[info] at
com.elbauldelprogramador.discretizers.IDADiscretizerTransformerSpec$$anonfun$1$$anonfun$apply$mcV$sp$1$$anonfun$apply$mc
V$sp$2.apply(IDADiscretizerTransformerSpec.scala:48)

[info] at org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85)
[info] at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
[info] ...
[info] Cause: java.lang.Exception: The user defined 'open()' method
caused an exception: Index: 0, Size: 0
[info] at
org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:498)
[info] at
org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368)
[info] at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
[info] at java.lang.Thread.run(Thread.java:748)
[info] ...
[info] Cause: java.lang.IndexOutOfBoundsException: Index: 0, Size: 0
[info] at java.util.ArrayList.rangeCheck(ArrayList.java:657)
[info] at java.util.ArrayList.get(ArrayList.java:433)
[info] at
org.apache.flink.ml.package$BroadcastSingleElementMapper.open(package.scala:82)
[info] at
org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
[info] at
org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:494)
[info] at
org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368)
[info] at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
[info] at java.lang.Thread.run(Thread.java:748)


It seems the problem is not with the code above, but it is caused by it,
since cuts is empty.

*-- Alejandro Alcalde - elbauldelprogramador.com
*


On Tue, Sep 25, 2018 at 1:33 PM Fabian Hueske  wrote:

> Hi,
>
> Can you post the full stacktrace?
>
> Thanks, Fabian
>
> Am Di., 25. Sep. 2018 um 12:55 Uhr schrieb Alejandro Alcalde <
> algu...@gmail.com>:
>
>> Hi,
>>
>> I am trying to improve the efficiency of this code:
>>
>> discretized.map(_._2)
>>   .name("Map V")
>>   .reduce((_, b) ⇒ b)
>>   .name("Get Last V")
>>
>> I am just interested in the last element of discretized.
>>
>> I've seen this SO question:
>> https://stackoverflow.com/questions/45076310/how-to-get-only-the-last-element-of-the-large-dataset-in-flink
>>
>> I've tried with this code:
>>
>> discretized.mapPartition(in ⇒ if (!in.hasNext) in.map(_._2) else Seq())
>>   .setParallelism(1)
>>   .name("Get Last V")
>>
>> But I am getting this error:
>>
>> Caused by: java.lang.IndexOutOfBoundsException: Index: 0, Size: 0
>>
>> Best
>>
>> *-- Alejandro Alcalde - elbauldelprogramador.com
>> *
>>
>


Re: How to join stream and batch data in Flink?

2018-09-25 Thread Hequn Cheng
Hi vino,

There are no order problems of stream-stream join in Flink. No matter what
order the elements come, stream-stream join in Flink will output results
which consistent with standard SQL semantics. I haven't read the book you
mentioned. For join, it doesn't guarantee output orders. You have to do
orderBy if you want to get ordered results.

Best, Hequn

On Tue, Sep 25, 2018 at 8:36 PM vino yang  wrote:

> Hi Fabian,
>
> I may not have stated it here, and there is no semantic problem at the
> Flink implementation level. Rather, there may be “Time-dependence” here. [1]
>
> Yes, my initial answer was not to use this form of join in this scenario,
> but Henry said he converted the table into a stream table and asked about
> the feasibility of other methods.
>
> [1]: 《Designing Data-Intensive Applications》By Martin Kleppmann, Part 3:
> Derived Data, Chapter 11: Stream Processing , Stream Joins.
>
> some content :
>
> *If the ordering of events across streams is undetermined, the join
> becomes nondeter‐ ministic [87], which means you cannot rerun the same job
> on the same input and necessarily get the same result: the events on the
> input streams may be interleaved in a different way when you run the job
> again. *
>
>
> Fabian Hueske  于2018年9月25日周二 下午8:08写道:
>
>> Hi,
>>
>> I don't think that using the current join implementation in the Table API
>> / SQL will work.
>> The non-windowed join fully materializes *both* input tables in state.
>> This is necessary, because the join needs to be able to process updates on
>> either side.
>> While this is not a problem for the fixed sized MySQL table,
>> materializing the append-only table (aka stream) is probably not what you
>> want.
>> You can also not limit idle state retention because it would remove the
>> MySQL table from state at some point.
>>
>> The only way to make it work is using a user-defined TableFunction that
>> queries the MySQL table via JDBC.
>> However, please note that these calls would be synchronous, blocking
>> calls.
>>
>> @Vino: Why do you think that the stream & stream join is not mature and
>> which problems do you see in the semantics?
>> The semantics are correct (standard SQL semantics) and in my opinion the
>> implementation is also mature.
>> However, you should not use the non-windowed join if any of the input
>> tables is ever growing because both sides must be hold in state. This is
>> not an issue of the semantics.
>>
>> Cheers,
>> Fabian
>>
>> Am Di., 25. Sep. 2018 um 14:00 Uhr schrieb vino yang <
>> yanghua1...@gmail.com>:
>>
>>> Hi Henry,
>>>
>>> 1) I don't recommend this method very much, but you said that you expect
>>> to convert mysql table to stream and then to flink table. Under this
>>> premise, I said that you can do this by joining two stream tables. But as
>>> you know, this join depends on the time period in which the state is saved.
>>> To make it equivalent to a dimension table, you must permanently save the
>>> state of the stream table that is defined as a "dimension table." I just
>>> said that modifying the relevant configuration in Flink can do this, Not
>>> for a single table.
>>>
>>> 2) Imagine that there are one million records in two tables. The records
>>> in both tables are just beginning to stream into flink, and the records as
>>> dimension tables are not fully arrived. Therefore, your matching results
>>> may not be as accurate as directly querying Mysql.
>>>
>>> In fact, the current stream & stream join is not very mature, there are
>>> some problems in semantics, I personally recommend that you return to
>>> stream/batch (mysql) join. For more principle content, I recommend you read
>>> a book, referred to as 《DDIA》.
>>>
>>> Thanks, vino.
>>>
>>> 徐涛  于2018年9月25日周二 下午5:48写道:
>>>
 Hi Vino,
 I do not quite understand in some sentences below, would you please
 help explain it a bit more detailedly?
 1. “*such as setting the state retention time of one of the tables to
 be permanent*” , as I know, the state retention time is a global
 config, I can not set this property per table.
 2. "*you may not be able to match the results, because the data
 belonging to the mysql table is just beginning to play as a stream*”
  Why it is not able to match the results?

 Best
 Henry

 在 2018年9月25日,下午5:29,vino yang  写道:

 Hi Henry,

 If you have converted the mysql table to a flink stream table. In flink
 table/sql, streams and stream joins can also do this, such as setting the
 state retention time of one of the tables to be permanent. But when the job
 is just running, you may not be able to match the results, because the data
 belonging to the mysql table is just beginning to play as a stream.

 Thanks, vino.

 徐涛  于2018年9月25日周二 下午5:10写道:

> Hi Vino & Hequn,
> I am now using the table/sql API, if I import the mysql table as a
> stream then convert it into a table, it seems that it can also b

Re: Null Flink State

2018-09-25 Thread Taher Koitawala
Hi Dawid,
  Thanks for the answer, how do I get the state of the Window
then? I do understand that elements are going to the state as window in
itself is a stateful operator. How do I get access to those elements?

Regards,
Taher Koitawala
GS Lab Pune
+91 8407979163


On Tue, Sep 25, 2018 at 6:51 PM Dawid Wysakowicz 
wrote:

> Hi Taher,
>
> As long as you don't put something into the state ValueState#value() will
> return null. The point for having ctx.globalState(1) and ctx.windowState(2)
> is to allow users to store some their own state, scoped to key(1) and  key
> & window(2) accordingly. If you want to access all elements assigned to
> that window you can iterate over them with the "itr" in your example.
>
> Best,
>
> Dawid
>
> On 25/09/18 15:07, Taher Koitawala wrote:
>
> Hi All,
>I am trying to access elements stored in the state of the
> window. As window, itself is a stateful operator I think I should be able
> to get records in the process function after the is triggered. Can someone
> tell me why in the following code is the state of the window null?
>
> Below is a mocked piece of code we are using. Am I doing something wrong
> here?
>
> env.enableCheckpointing(2L);
> env.setStateBackend(new FsStateBackend("path"));
> DataStream stream1 =env.addSource(new FlinkKafkaConsumer);
> DataStream stream2 =env.addSource(new FlinkKafkaConsumer);
>
> stream1.union(stream2)
> .keyBy()
> .timeWindow(Time.milliseconds((30L))
> .allowedLateness(Time.minutes(1))
> .process(new ProcessWindowFunction()
> {
> public void process(T t, ProcessWindowFunction.Context ctx, Iterable
> itr, Collectorcollector)
> KeyedStateStore globalState = ctx.globalState();
> ValueState>
> valueState = ctx.globalState().getState(new
> ValueStateDescriptor<>("valueState", TypeInformation.of(new TypeHint()
> {})));
>
> System.out.println(valueState.value());
> collector.collect(T)
> })
>
>
>
> Regards,
> Taher Koitawala
> GS Lab Pune
> +91 8407979163
>
>
>


Re: Null Flink State

2018-09-25 Thread Dawid Wysakowicz
Hi Taher,

As long as you don't put something into the state ValueState#value()
will return null. The point for having ctx.globalState(1) and
ctx.windowState(2) is to allow users to store some their own state,
scoped to key(1) and  key & window(2) accordingly. If you want to access
all elements assigned to that window you can iterate over them with the
"itr" in your example.

Best,

Dawid


On 25/09/18 15:07, Taher Koitawala wrote:
> Hi All,
>            I am trying to access elements stored in the state of the
> window. As window, itself is a stateful operator I think I should be
> able to get records in the process function after the is triggered.
> Can someone tell me why in the following code is the state of the
> window null? 
>
> Below is a mocked piece of code we are using. Am I doing something
> wrong here?
>
> env.enableCheckpointing(2L);
> env.setStateBackend(new FsStateBackend("path"));
> DataStream stream1 =env.addSource(new FlinkKafkaConsumer);
> DataStream stream2 =env.addSource(new FlinkKafkaConsumer);
>
> stream1.union(stream2)
> .keyBy()
> .timeWindow(Time.milliseconds((30L))
> .allowedLateness(Time.minutes(1))
> .process(new ProcessWindowFunction()
> {
> public void process(T t, ProcessWindowFunction.Context ctx,
> Iterable itr, Collectorcollector)
> KeyedStateStore globalState = ctx.globalState();
> ValueState>
> valueState = ctx.globalState().getState(new
> ValueStateDescriptor<>("valueState", TypeInformation.of(new
> TypeHint() {})));
>
> System.out.println(valueState.value());
> collector.collect(T)
> })
>
>
>
> Regards,
> Taher Koitawala
> GS Lab Pune
> +91 8407979163



signature.asc
Description: OpenPGP digital signature


Null Flink State

2018-09-25 Thread Taher Koitawala
Hi All,
   I am trying to access elements stored in the state of the
window. As window, itself is a stateful operator I think I should be able
to get records in the process function after the is triggered. Can someone
tell me why in the following code is the state of the window null?

Below is a mocked piece of code we are using. Am I doing something wrong
here?

env.enableCheckpointing(2L);
env.setStateBackend(new FsStateBackend("path"));
DataStream stream1 =env.addSource(new FlinkKafkaConsumer);
DataStream stream2 =env.addSource(new FlinkKafkaConsumer);

stream1.union(stream2)
.keyBy()
.timeWindow(Time.milliseconds((30L))
.allowedLateness(Time.minutes(1))
.process(new ProcessWindowFunction()
{
public void process(T t, ProcessWindowFunction.Context ctx, Iterable
itr, Collectorcollector)
KeyedStateStore globalState = ctx.globalState();
ValueState> valueState
= ctx.globalState().getState(new ValueStateDescriptor<>("valueState",
TypeInformation.of(new TypeHint() {})));

System.out.println(valueState.value());
collector.collect(T)
})



Regards,
Taher Koitawala
GS Lab Pune
+91 8407979163


Re: Flink - Process datastream in a bounded context (like Dataset) - Unifying stream & batch

2018-09-25 Thread Hequn Cheng
Hi bastien,

Flink features two relational APIs, the Table API and SQL. Both APIs are
unified APIs for batch and stream processing, i.e., queries are executed
with the same semantics on unbounded, real-time streams or bounded[1].
There are also documents about Join[2].

Best, Hequn
[1] https://flink.apache.org/flink-applications.html#layered-apis
[2]
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql.html#joins

On Tue, Sep 25, 2018 at 4:14 PM bastien dine  wrote:

> Hello everyone,
>
> I need to join some files to perform some processing.. The dataset API is
> a perfect way to achieve this, I am able to do it when I read file in batch
> (csv)
>
> However in the prod environment, I will receive thoses files in kafka
> messages (one message = one line of a file)
> So I am considering using a global window + a custom trigger on a end of
> file message and a process window function.
> But I can not go too far with that as process is only one function and
> chaining functions will be a pain. I don't think that emitting a datastream
> & windows / trigger on EOF before every process function is a good idea
>
> However I would like to work in a bounded way once I received all of my
> elements (after the trigger on global window), like the dataset API, as I
> will join on my whole dataset..
>
> I thought maybe it would be a good idea to go for table API and group
> window ? but you can not have custom trigger and a global group window on a
> table ?(like the global window on datastream ?)
> Best alternative would be to create a dataset as a result of my process
> window function.. but I don't think this is possible, is it ?
>
> Best Regards,
> Bastien
>


Re: How to join stream and batch data in Flink?

2018-09-25 Thread vino yang
Hi Fabian,

I may not have stated it here, and there is no semantic problem at the
Flink implementation level. Rather, there may be “Time-dependence” here. [1]

Yes, my initial answer was not to use this form of join in this scenario,
but Henry said he converted the table into a stream table and asked about
the feasibility of other methods.

[1]: 《Designing Data-Intensive Applications》By Martin Kleppmann, Part 3:
Derived Data, Chapter 11: Stream Processing , Stream Joins.

some content :

*If the ordering of events across streams is undetermined, the join becomes
nondeter‐ ministic [87], which means you cannot rerun the same job on the
same input and necessarily get the same result: the events on the input
streams may be interleaved in a different way when you run the job again. *


Fabian Hueske  于2018年9月25日周二 下午8:08写道:

> Hi,
>
> I don't think that using the current join implementation in the Table API
> / SQL will work.
> The non-windowed join fully materializes *both* input tables in state.
> This is necessary, because the join needs to be able to process updates on
> either side.
> While this is not a problem for the fixed sized MySQL table, materializing
> the append-only table (aka stream) is probably not what you want.
> You can also not limit idle state retention because it would remove the
> MySQL table from state at some point.
>
> The only way to make it work is using a user-defined TableFunction that
> queries the MySQL table via JDBC.
> However, please note that these calls would be synchronous, blocking calls.
>
> @Vino: Why do you think that the stream & stream join is not mature and
> which problems do you see in the semantics?
> The semantics are correct (standard SQL semantics) and in my opinion the
> implementation is also mature.
> However, you should not use the non-windowed join if any of the input
> tables is ever growing because both sides must be hold in state. This is
> not an issue of the semantics.
>
> Cheers,
> Fabian
>
> Am Di., 25. Sep. 2018 um 14:00 Uhr schrieb vino yang <
> yanghua1...@gmail.com>:
>
>> Hi Henry,
>>
>> 1) I don't recommend this method very much, but you said that you expect
>> to convert mysql table to stream and then to flink table. Under this
>> premise, I said that you can do this by joining two stream tables. But as
>> you know, this join depends on the time period in which the state is saved.
>> To make it equivalent to a dimension table, you must permanently save the
>> state of the stream table that is defined as a "dimension table." I just
>> said that modifying the relevant configuration in Flink can do this, Not
>> for a single table.
>>
>> 2) Imagine that there are one million records in two tables. The records
>> in both tables are just beginning to stream into flink, and the records as
>> dimension tables are not fully arrived. Therefore, your matching results
>> may not be as accurate as directly querying Mysql.
>>
>> In fact, the current stream & stream join is not very mature, there are
>> some problems in semantics, I personally recommend that you return to
>> stream/batch (mysql) join. For more principle content, I recommend you read
>> a book, referred to as 《DDIA》.
>>
>> Thanks, vino.
>>
>> 徐涛  于2018年9月25日周二 下午5:48写道:
>>
>>> Hi Vino,
>>> I do not quite understand in some sentences below, would you please help
>>> explain it a bit more detailedly?
>>> 1. “*such as setting the state retention time of one of the tables to
>>> be permanent*” , as I know, the state retention time is a global
>>> config, I can not set this property per table.
>>> 2. "*you may not be able to match the results, because the data
>>> belonging to the mysql table is just beginning to play as a stream*”
>>>  Why it is not able to match the results?
>>>
>>> Best
>>> Henry
>>>
>>> 在 2018年9月25日,下午5:29,vino yang  写道:
>>>
>>> Hi Henry,
>>>
>>> If you have converted the mysql table to a flink stream table. In flink
>>> table/sql, streams and stream joins can also do this, such as setting the
>>> state retention time of one of the tables to be permanent. But when the job
>>> is just running, you may not be able to match the results, because the data
>>> belonging to the mysql table is just beginning to play as a stream.
>>>
>>> Thanks, vino.
>>>
>>> 徐涛  于2018年9月25日周二 下午5:10写道:
>>>
 Hi Vino & Hequn,
 I am now using the table/sql API, if I import the mysql table as a
 stream then convert it into a table, it seems that it can also be a
 workaround for batch/streaming joining. May I ask what is the difference
 between the UDTF method? Does this implementation has some defects?
 Best
 Henry

 在 2018年9月22日,上午10:28,Hequn Cheng  写道:

 Hi

 +1 for vino's answer.
 Also, this kind of join will be supported in FLINK-9712
 . You can check more
 details in the jira.

 Best, Hequn

 On Fri, Sep 21, 2018 at 4:51 PM vino yang 
 wrote:

> Hi Henry,
>>

Re: Strange behaviour with checkpointing and custom FilePathFilter

2018-09-25 Thread Averell
Thank you Kostas for spending time on my case.

Relating to the issue I mentioned, I have another issue caused by having a
lot of files to list. From the error msg, I understand that the listing was
taking more than 30s, and the JM thought that it hung and killed it. Is that
possible to increase this 30s timer?

Thanks and regards,
Averell


2018-09-25 12:01:13.222 [Canceler/Interrupts for Source: Custom File Source
(1/1) (a5f5434070044510eafc9103bc24af43).] WARN 
org.apache.flink.runtime.taskmanager.Task  - Task 'Source: Custom File
Source (1/1)' did not react to cancelling signal for 30 seconds, but is
stuck in method:
 java.net.URI$Parser.scan(URI.java:2998)
java.net.URI$Parser.parseAuthority(URI.java:3138)
java.net.URI$Parser.parseHierarchical(URI.java:3097)
java.net.URI$Parser.parse(URI.java:3053)
java.net.URI.(URI.java:746)
org.apache.hadoop.fs.Path.makeQualified(Path.java:467)
org.apache.hadoop.fs.FileSystem.makeQualified(FileSystem.java:464)
com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem$$Lambda$63/515305348.apply(Unknown
Source)
com.amazon.ws.emr.hadoop.fs.s3n.BasicFileStatusFactory.newFile(BasicFileStatusFactory.java:69)
com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem.newFile(S3NativeFileSystem.java:1154)
com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem.listStatus(S3NativeFileSystem.java:962)
com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem.listStatus(S3NativeFileSystem.java:914)
com.amazon.ws.emr.hadoop.fs.EmrFileSystem.listStatus(EmrFileSystem.java:364)
org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.listStatus(HadoopFileSystem.java:157)
org.apache.flink.core.fs.SafetyNetWrapperFileSystem.listStatus(SafetyNetWrapperFileSystem.java:97)
org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction.listEligibleFiles(ContinuousFileMonitoringFunction.java:395)
org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction.listEligibleFiles(ContinuousFileMonitoringFunction.java:416)
org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction.listEligibleFiles(ContinuousFileMonitoringFunction.java:416)
org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction.monitorDirAndForwardSplits(ContinuousFileMonitoringFunction.java:327)
org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction.run(ContinuousFileMonitoringFunction.java:292)
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:87)
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:56)
org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:99)
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
java.lang.Thread.run(Thread.java:748)



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Should Queryable State Server be listening on 127.0.1.1?

2018-09-25 Thread Andrew Kowpak
I'm running into an issue where I am starting a standalone flink cluster in
an lxc container.  When my TaskManager starts up, the queryable state proxy
starts listening on 127.0.1.1:9069.  Attempting to connect to that port
from outside the container fails.  I'm totally willing to believe this is a
configuration problem within my container, so, I just wanted to verify that
it was expected behaviour to listen on that IP address.  As far as I can
tell, when the TaskManagerRunner creates an RpcService, it finds the task
manager address in ConnectionUtls.findConnectingAddress by:

(1) Using AkkaUtils.getInetSocketAddressFromAkkaURL to find the target
address (this resolves to 127.0.0.1)
(2) Uses the LOCAL_HOST address detection strategy to find the proper
address, this calls InetAddress.getLocalHost which resolves to 127.0.1.1
(as per the default /etc/hosts file on the container)
(3) Determines that a connection can be made from 127.0.1.1 to 127.0.0.1,
so uses 127.0.1.1 as the task manager address.

If you can let me know if this is then intended behaviour, that would be
great.  If you have any suggestions as to how I can connect to the server
from outside my container, that would also be great.

Thanks.

-- 
*Andrew Kowpak P.Eng* *Sr. Software Engineer*
(519)  489 2688 | SSIMWAVE Inc.
402-140 Columbia Street West, Waterloo ON


Re: How to join stream and batch data in Flink?

2018-09-25 Thread Fabian Hueske
Hi,

I don't think that using the current join implementation in the Table API /
SQL will work.
The non-windowed join fully materializes *both* input tables in state. This
is necessary, because the join needs to be able to process updates on
either side.
While this is not a problem for the fixed sized MySQL table, materializing
the append-only table (aka stream) is probably not what you want.
You can also not limit idle state retention because it would remove the
MySQL table from state at some point.

The only way to make it work is using a user-defined TableFunction that
queries the MySQL table via JDBC.
However, please note that these calls would be synchronous, blocking calls.

@Vino: Why do you think that the stream & stream join is not mature and
which problems do you see in the semantics?
The semantics are correct (standard SQL semantics) and in my opinion the
implementation is also mature.
However, you should not use the non-windowed join if any of the input
tables is ever growing because both sides must be hold in state. This is
not an issue of the semantics.

Cheers,
Fabian

Am Di., 25. Sep. 2018 um 14:00 Uhr schrieb vino yang :

> Hi Henry,
>
> 1) I don't recommend this method very much, but you said that you expect
> to convert mysql table to stream and then to flink table. Under this
> premise, I said that you can do this by joining two stream tables. But as
> you know, this join depends on the time period in which the state is saved.
> To make it equivalent to a dimension table, you must permanently save the
> state of the stream table that is defined as a "dimension table." I just
> said that modifying the relevant configuration in Flink can do this, Not
> for a single table.
>
> 2) Imagine that there are one million records in two tables. The records
> in both tables are just beginning to stream into flink, and the records as
> dimension tables are not fully arrived. Therefore, your matching results
> may not be as accurate as directly querying Mysql.
>
> In fact, the current stream & stream join is not very mature, there are
> some problems in semantics, I personally recommend that you return to
> stream/batch (mysql) join. For more principle content, I recommend you read
> a book, referred to as 《DDIA》.
>
> Thanks, vino.
>
> 徐涛  于2018年9月25日周二 下午5:48写道:
>
>> Hi Vino,
>> I do not quite understand in some sentences below, would you please help
>> explain it a bit more detailedly?
>> 1. “*such as setting the state retention time of one of the tables to be
>> permanent*” , as I know, the state retention time is a global config, I
>> can not set this property per table.
>> 2. "*you may not be able to match the results, because the data
>> belonging to the mysql table is just beginning to play as a stream*”
>>  Why it is not able to match the results?
>>
>> Best
>> Henry
>>
>> 在 2018年9月25日,下午5:29,vino yang  写道:
>>
>> Hi Henry,
>>
>> If you have converted the mysql table to a flink stream table. In flink
>> table/sql, streams and stream joins can also do this, such as setting the
>> state retention time of one of the tables to be permanent. But when the job
>> is just running, you may not be able to match the results, because the data
>> belonging to the mysql table is just beginning to play as a stream.
>>
>> Thanks, vino.
>>
>> 徐涛  于2018年9月25日周二 下午5:10写道:
>>
>>> Hi Vino & Hequn,
>>> I am now using the table/sql API, if I import the mysql table as a
>>> stream then convert it into a table, it seems that it can also be a
>>> workaround for batch/streaming joining. May I ask what is the difference
>>> between the UDTF method? Does this implementation has some defects?
>>> Best
>>> Henry
>>>
>>> 在 2018年9月22日,上午10:28,Hequn Cheng  写道:
>>>
>>> Hi
>>>
>>> +1 for vino's answer.
>>> Also, this kind of join will be supported in FLINK-9712
>>> . You can check more
>>> details in the jira.
>>>
>>> Best, Hequn
>>>
>>> On Fri, Sep 21, 2018 at 4:51 PM vino yang  wrote:
>>>
 Hi Henry,

 There are three ways I can think of:

 1) use DataStream API, implement a flatmap UDF to access dimension
 table;
 2) use table/sql API, implement a UDTF to access dimension table;
 3) customize the table/sql join API/statement's implementation (and
 change the physical plan)

 Thanks, vino.

 徐涛  于2018年9月21日周五 下午4:43写道:

> Hi All,
> Sometimes some “dimension table” need to be joined from the
> "fact table", if data are not joined before sent to Kafka.
> So if the data are joined in Flink, does the “dimension table”
> have to be import as a stream, or there are some other ways can achieve 
> it?
> Thanks a lot!
>
> Best
> Henry


>>>
>>


Re: How to join stream and batch data in Flink?

2018-09-25 Thread vino yang
Hi Henry,

1) I don't recommend this method very much, but you said that you expect to
convert mysql table to stream and then to flink table. Under this premise,
I said that you can do this by joining two stream tables. But as you know,
this join depends on the time period in which the state is saved. To make
it equivalent to a dimension table, you must permanently save the state of
the stream table that is defined as a "dimension table." I just said that
modifying the relevant configuration in Flink can do this, Not for a single
table.

2) Imagine that there are one million records in two tables. The records in
both tables are just beginning to stream into flink, and the records as
dimension tables are not fully arrived. Therefore, your matching results
may not be as accurate as directly querying Mysql.

In fact, the current stream & stream join is not very mature, there are
some problems in semantics, I personally recommend that you return to
stream/batch (mysql) join. For more principle content, I recommend you read
a book, referred to as 《DDIA》.

Thanks, vino.

徐涛  于2018年9月25日周二 下午5:48写道:

> Hi Vino,
> I do not quite understand in some sentences below, would you please help
> explain it a bit more detailedly?
> 1. “*such as setting the state retention time of one of the tables to be
> permanent*” , as I know, the state retention time is a global config, I
> can not set this property per table.
> 2. "*you may not be able to match the results, because the data belonging
> to the mysql table is just beginning to play as a stream*”  Why it is not
> able to match the results?
>
> Best
> Henry
>
> 在 2018年9月25日,下午5:29,vino yang  写道:
>
> Hi Henry,
>
> If you have converted the mysql table to a flink stream table. In flink
> table/sql, streams and stream joins can also do this, such as setting the
> state retention time of one of the tables to be permanent. But when the job
> is just running, you may not be able to match the results, because the data
> belonging to the mysql table is just beginning to play as a stream.
>
> Thanks, vino.
>
> 徐涛  于2018年9月25日周二 下午5:10写道:
>
>> Hi Vino & Hequn,
>> I am now using the table/sql API, if I import the mysql table as a stream
>> then convert it into a table, it seems that it can also be a workaround for
>> batch/streaming joining. May I ask what is the difference between the UDTF
>> method? Does this implementation has some defects?
>> Best
>> Henry
>>
>> 在 2018年9月22日,上午10:28,Hequn Cheng  写道:
>>
>> Hi
>>
>> +1 for vino's answer.
>> Also, this kind of join will be supported in FLINK-9712
>> . You can check more
>> details in the jira.
>>
>> Best, Hequn
>>
>> On Fri, Sep 21, 2018 at 4:51 PM vino yang  wrote:
>>
>>> Hi Henry,
>>>
>>> There are three ways I can think of:
>>>
>>> 1) use DataStream API, implement a flatmap UDF to access dimension table;
>>> 2) use table/sql API, implement a UDTF to access dimension table;
>>> 3) customize the table/sql join API/statement's implementation (and
>>> change the physical plan)
>>>
>>> Thanks, vino.
>>>
>>> 徐涛  于2018年9月21日周五 下午4:43写道:
>>>
 Hi All,
 Sometimes some “dimension table” need to be joined from the
 "fact table", if data are not joined before sent to Kafka.
 So if the data are joined in Flink, does the “dimension table”
 have to be import as a stream, or there are some other ways can achieve it?
 Thanks a lot!

 Best
 Henry
>>>
>>>
>>
>


Re: Get last element of a DataSe

2018-09-25 Thread Fabian Hueske
Hi,

Can you post the full stacktrace?

Thanks, Fabian

Am Di., 25. Sep. 2018 um 12:55 Uhr schrieb Alejandro Alcalde <
algu...@gmail.com>:

> Hi,
>
> I am trying to improve the efficiency of this code:
>
> discretized.map(_._2)
>   .name("Map V")
>   .reduce((_, b) ⇒ b)
>   .name("Get Last V")
>
> I am just interested in the last element of discretized.
>
> I've seen this SO question:
> https://stackoverflow.com/questions/45076310/how-to-get-only-the-last-element-of-the-large-dataset-in-flink
>
> I've tried with this code:
>
> discretized.mapPartition(in ⇒ if (!in.hasNext) in.map(_._2) else Seq())
>   .setParallelism(1)
>   .name("Get Last V")
>
> But I am getting this error:
>
> Caused by: java.lang.IndexOutOfBoundsException: Index: 0, Size: 0
>
> Best
>
> *-- Alejandro Alcalde - elbauldelprogramador.com
> *
>


timewindowall and aggregate(count): count 0 when no event in the window

2018-09-25 Thread Luigi Sgaglione
Hi,

I'm trying to count the number of events in a window (every 5 seconds).
The code below works fine if there are events in the window, if there are
no events in the window no output is emitted.

What I want to achieve is a count of 0 when there are no events in the time
window of 5 seconds.

Can you help me?


Thanks

DataStreamSource stream = env.addSource(myConsumer);
DataStream tupledStream = stream.map(new
Json2Tuple());
SingleOutputStreamOperator out = tupledStream
.filter(new FilterFunction() {
private static final long serialVersionUID = 1L;
@Override
public boolean filter(InputTuple_usb_monitor arg0) throws Exception {
return (arg0.usb_code.equals("UMDFHostDeviceArrivalBegin"));
}
})
.timeWindowAll(Time.seconds(5))
.aggregate(new AvgCountAggregate());
out.print();


Get last element of a DataSe

2018-09-25 Thread Alejandro Alcalde
Hi,

I am trying to improve the efficiency of this code:

discretized.map(_._2)
  .name("Map V")
  .reduce((_, b) ⇒ b)
  .name("Get Last V")

I am just interested in the last element of discretized.

I've seen this SO question:
https://stackoverflow.com/questions/45076310/how-to-get-only-the-last-element-of-the-large-dataset-in-flink

I've tried with this code:

discretized.mapPartition(in ⇒ if (!in.hasNext) in.map(_._2) else Seq())
  .setParallelism(1)
  .name("Get Last V")

But I am getting this error:

Caused by: java.lang.IndexOutOfBoundsException: Index: 0, Size: 0

Best

*-- Alejandro Alcalde - elbauldelprogramador.com
*


Re: Could not retrieve the redirect address - No REST endpoint has been started

2018-09-25 Thread PedroMrChaves
Hello,

Thank you for the reply.

The problem sometimes happens when there is a jobmanager failover. I've
attached the jobmanager logs for further debugging. 

flink-flink-jobmanager-1-demchcep00-01.log

  

Thank you and Regards,
Pedro Chaves. 



-
Best Regards,
Pedro Chaves
--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Could not find a suitable table factory for 'org.apache.flink.table.factories.DeserializationSchemaFactory' in the classpath.

2018-09-25 Thread clay4444
hi:
I am using flink's table api, I receive data from kafka, then register it as
a table, then I use sql statement to process, and finally convert the result
back to a stream, write to a directory, the code looks like this:

def main(args: Array[String]): Unit = {

val sEnv = StreamExecutionEnvironment.getExecutionEnvironment
sEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

val tEnv = TableEnvironment.getTableEnvironment(sEnv)

   tEnv.connect(
  new Kafka()
.version("0.11")
.topic("user")
.startFromEarliest()
.property("zookeeper.connect", "")
.property("bootstrap.servers", "")
)
  .withFormat(
new Json()
  .failOnMissingField(false)
  .deriveSchema()   //使用表的 schema
  )
  .withSchema(
new Schema()
  .field("username_skey", Types.STRING)
  )
  .inAppendMode()
  .registerTableSource("user")
 val userTest: Table = tEnv.sqlQuery(
  """
   select ** form ** join **"".stripMargin)
val endStream = tEnv.toRetractStream[Row](userTest)
endStream.writeAsText("/tmp/sqlres",WriteMode.OVERWRITE)
sEnv.execute("Test_New_Sign_Student")
 }

I was successful in the local test, but when I submit the following command
in the cluster, I get the following error:

===
org.apache.flink.client.program.ProgramInvocationException: The main method
caused an error.
at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:546)
at
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:421)
at
org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:426)
at
org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:804)
at
org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:280)
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:215)
at
org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1044)
at
org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1120)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1692)
at
org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
at
org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1120)
Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException: Could
not find a suitable table factory for
'org.apache.flink.table.factories.DeserializationSchemaFactory' in
the classpath.

Reason: No factory implements
'org.apache.flink.table.factories.DeserializationSchemaFactory'.

The following properties are requested:
connector.properties.0.key=zookeeper.connect

schema.9.name=roles
schema.9.type=VARCHAR
update-mode=append

The following factories have been considered:
org.apache.flink.table.sources.CsvBatchTableSourceFactory
org.apache.flink.table.sources.CsvAppendTableSourceFactory
org.apache.flink.table.sinks.CsvBatchTableSinkFactory
org.apache.flink.table.sinks.CsvAppendTableSinkFactory
org.apache.flink.streaming.connectors.kafka.Kafka011TableSourceSinkFactory


at
org.apache.flink.table.factories.TableFactoryService$.filterByFactoryClass(TableFactoryService.scala:176)
at
org.apache.flink.table.factories.TableFactoryService$.findInternal(TableFactoryService.scala:125)
at
org.apache.flink.table.factories.TableFactoryService$.find(TableFactoryService.scala:100)
at
org.apache.flink.table.factories.TableFactoryService.find(TableFactoryService.scala)
at
org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactoryBase.getDeserializationSchema(KafkaTableSourceSinkFactoryBase.java:259)
at
org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactoryBase.createStreamTableSource(KafkaTableSourceSinkFactoryBase.java:144)
at
org.apache.flink.table.factories.TableFactoryUtil$.findAndCreateTableSource(TableFactoryUtil.scala:50)
at
org.apache.flink.table.descriptors.ConnectTableDescriptor.registerTableSource(ConnectTableDescriptor.scala:44)
at
org.clay.test.Test_New_Sign_Student$.main(Test_New_Sign_Student.scala:64)
at
org.clay.test.Test_New_Sign_Student.main(Test_New_Sign_Student.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:529)



Re: How to join stream and batch data in Flink?

2018-09-25 Thread 徐涛
Hi Vino,
I do not quite understand in some sentences below, would you please 
help explain it a bit more detailedly?
1. “such as setting the state retention time of one of the tables to be 
permanent” , as I know, the state retention time is a global config, I can not 
set this property per table.
2. "you may not be able to match the results, because the data 
belonging to the mysql table is just beginning to play as a stream”  Why it is 
not able to match the results?

Best
Henry

> 在 2018年9月25日,下午5:29,vino yang  写道:
> 
> Hi Henry,
> 
> If you have converted the mysql table to a flink stream table. In flink 
> table/sql, streams and stream joins can also do this, such as setting the 
> state retention time of one of the tables to be permanent. But when the job 
> is just running, you may not be able to match the results, because the data 
> belonging to the mysql table is just beginning to play as a stream.
> 
> Thanks, vino.
> 
> 徐涛 mailto:happydexu...@gmail.com>> 于2018年9月25日周二 
> 下午5:10写道:
> Hi Vino & Hequn,
>   I am now using the table/sql API, if I import the mysql table as a 
> stream then convert it into a table, it seems that it can also be a 
> workaround for batch/streaming joining. May I ask what is the difference 
> between the UDTF method? Does this implementation has some defects?
>   
> Best
> Henry
> 
>> 在 2018年9月22日,上午10:28,Hequn Cheng > > 写道:
>> 
>> Hi
>> 
>> +1 for vino's answer. 
>> Also, this kind of join will be supported in FLINK-9712 
>> . You can check more 
>> details in the jira.
>> 
>> Best, Hequn
>> 
>> On Fri, Sep 21, 2018 at 4:51 PM vino yang > > wrote:
>> Hi Henry,
>> 
>> There are three ways I can think of:
>> 
>> 1) use DataStream API, implement a flatmap UDF to access dimension table;
>> 2) use table/sql API, implement a UDTF to access dimension table;
>> 3) customize the table/sql join API/statement's implementation (and change 
>> the physical plan)
>> 
>> Thanks, vino.
>> 
>> 徐涛 mailto:happydexu...@gmail.com>> 于2018年9月21日周五 
>> 下午4:43写道:
>> Hi All,
>> Sometimes some “dimension table” need to be joined from the "fact 
>> table", if data are not joined before sent to Kafka.
>> So if the data are joined in Flink, does the “dimension table” have 
>> to be import as a stream, or there are some other ways can achieve it?
>> Thanks a lot!
>> 
>> Best
>> Henry
> 



Re: How to join stream and batch data in Flink?

2018-09-25 Thread vino yang
Hi Henry,

If you have converted the mysql table to a flink stream table. In flink
table/sql, streams and stream joins can also do this, such as setting the
state retention time of one of the tables to be permanent. But when the job
is just running, you may not be able to match the results, because the data
belonging to the mysql table is just beginning to play as a stream.

Thanks, vino.

徐涛  于2018年9月25日周二 下午5:10写道:

> Hi Vino & Hequn,
> I am now using the table/sql API, if I import the mysql table as a stream
> then convert it into a table, it seems that it can also be a workaround for
> batch/streaming joining. May I ask what is the difference between the UDTF
> method? Does this implementation has some defects?
> Best
> Henry
>
> 在 2018年9月22日,上午10:28,Hequn Cheng  写道:
>
> Hi
>
> +1 for vino's answer.
> Also, this kind of join will be supported in FLINK-9712
> . You can check more
> details in the jira.
>
> Best, Hequn
>
> On Fri, Sep 21, 2018 at 4:51 PM vino yang  wrote:
>
>> Hi Henry,
>>
>> There are three ways I can think of:
>>
>> 1) use DataStream API, implement a flatmap UDF to access dimension table;
>> 2) use table/sql API, implement a UDTF to access dimension table;
>> 3) customize the table/sql join API/statement's implementation (and
>> change the physical plan)
>>
>> Thanks, vino.
>>
>> 徐涛  于2018年9月21日周五 下午4:43写道:
>>
>>> Hi All,
>>> Sometimes some “dimension table” need to be joined from the
>>> "fact table", if data are not joined before sent to Kafka.
>>> So if the data are joined in Flink, does the “dimension table”
>>> have to be import as a stream, or there are some other ways can achieve it?
>>> Thanks a lot!
>>>
>>> Best
>>> Henry
>>
>>
>


Re: JobManager container is running beyond physical memory limits

2018-09-25 Thread eSKa
we dont set it up anywhere so i guess its default 16. Do you think its too
much?



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: How to join stream and batch data in Flink?

2018-09-25 Thread 徐涛
Hi Vino & Hequn,
I am now using the table/sql API, if I import the mysql table as a 
stream then convert it into a table, it seems that it can also be a workaround 
for batch/streaming joining. May I ask what is the difference between the UDTF 
method? Does this implementation has some defects?

Best
Henry

> 在 2018年9月22日,上午10:28,Hequn Cheng  写道:
> 
> Hi
> 
> +1 for vino's answer. 
> Also, this kind of join will be supported in FLINK-9712 
> . You can check more 
> details in the jira.
> 
> Best, Hequn
> 
> On Fri, Sep 21, 2018 at 4:51 PM vino yang  > wrote:
> Hi Henry,
> 
> There are three ways I can think of:
> 
> 1) use DataStream API, implement a flatmap UDF to access dimension table;
> 2) use table/sql API, implement a UDTF to access dimension table;
> 3) customize the table/sql join API/statement's implementation (and change 
> the physical plan)
> 
> Thanks, vino.
> 
> 徐涛 mailto:happydexu...@gmail.com>> 于2018年9月21日周五 
> 下午4:43写道:
> Hi All,
> Sometimes some “dimension table” need to be joined from the "fact 
> table", if data are not joined before sent to Kafka.
> So if the data are joined in Flink, does the “dimension table” have 
> to be import as a stream, or there are some other ways can achieve it?
> Thanks a lot!
> 
> Best
> Henry



Re: ***UNCHECKED*** Error while confirming Checkpoint

2018-09-25 Thread Stefan Richter
Hi,

I cannot spot anything bad or „wrong“ about your job configuration. Maybe you 
can try to save and send the logs if it happens again? Did you observe this 
only once, often, or is it something that is even reproduceable?

Best,
Stefan

> Am 24.09.2018 um 10:15 schrieb PedroMrChaves :
> 
> Hello Stefan, 
> 
> Thank you for the help.
> 
> I've actually lost those logs to due several cluster restarts that we did,
> which cause log rotation up (limit = 5 versions).
> Those log lines that i've posted were the only ones that showed signs of
> some problem. 
> 
> *The configuration of the job is as follows:*
> 
> / private static final int DEFAULT_MAX_PARALLELISM = 16;
>private static final int CHECKPOINTING_INTERVAL = 1000;
>private static final int MIN_PAUSE_BETWEEN_CHECKPOINTS = 1000;
>private static final int CHECKPOINT_TIMEOUT = 6;
>private static final int INTERVAL_BETWEEN_RESTARTS = 120; 
> (...)
> 
>  environment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>environment.setMaxParallelism(DEFAULT_MAX_PARALLELISM);
>environment.enableCheckpointing(CHECKPOINTING_INTERVAL,
> CheckpointingMode.EXACTLY_ONCE);
> 
> environment.getCheckpointConfig().setMinPauseBetweenCheckpoints(MIN_PAUSE_BETWEEN_CHECKPOINTS);
> 
> environment.getCheckpointConfig().setCheckpointTimeout(CHECKPOINT_TIMEOUT);
>environment.setRestartStrategy(RestartStrategies.noRestart());
>environment.setParallelism(parameters.getInt(JOB_PARALLELISM));/
> *
> the kafka consumer/producer configuration is:*
> /
>properties.put("value.deserializer",
> "org.apache.kafka.common.serialization.StringDeserializer");
>properties.put("max.request.size","1579193");
>properties.put("processing.guarantee","exactly_once");
>properties.put("isolation.level","read_committed");/
> 
> 
> 
> -
> Best Regards,
> Pedro Chaves
> --
> Sent from: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/



Re: Question about Window Tigger

2018-09-25 Thread Chang Liu
Hi Rong,

Thanks for your reply. :)

Best regards/祝好,

Chang Liu 刘畅


> On 19 Sep 2018, at 18:20, Rong Rong  wrote:
> 
> Hi Chang,
> 
> There were some previous discussion regarding how to debug watermark and 
> window triggers[1].
> Basically if there's no data for some partitions there's no way to advance 
> watermark. As it would not be able to determine whether this is due to 
> network failure or actually there's no data arriving at the source.
> I think your use case is better of using SlidingProcessingTimeWindow.
> 
> Thanks,
> Rong
> 
> [1] 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/watermark-does-not-progress-td22315.html
>  
> 
> On Wed, Sep 19, 2018 at 1:48 AM Chang Liu  > wrote:
> Dear All,
> 
> I have a question about the Window Trigger: let’s say i would like like use 
> the SlidingEventTimeWindow (60 seconds window size + 1 second window shift) 
> to count the number of records per window. And I am using Event Time with 
> periodic watermarking with certain maxOurOfOrderness time.
> 
> Sometimes, what happens is: during certain time, there is no incoming event, 
> and then the watermark for triggering the window fire is not coming.  Then, 
> the last several records will be just stayed in the window.  It will fire 
> only when the window sees the watermark to trigger.
> 
> What I would like to achieve is: if there is no just watermark coming within 
> certain time (maybe this time is system clock time?), I can still trigger the 
> window to fire no matter whether there is new event coming or not. Then I can 
> still get the window count for this window, without waiting the next event, 
> which could be coming after a long time.
> 
> Do you have any idea how can I do this? Many Thanks :)
> 
> Best regards/祝好,
> 
> Chang Liu 刘畅
> 
> 



Flink - Process datastream in a bounded context (like Dataset) - Unifying stream & batch

2018-09-25 Thread bastien dine
Hello everyone,

I need to join some files to perform some processing.. The dataset API is a
perfect way to achieve this, I am able to do it when I read file in batch
(csv)

However in the prod environment, I will receive thoses files in kafka
messages (one message = one line of a file)
So I am considering using a global window + a custom trigger on a end of
file message and a process window function.
But I can not go too far with that as process is only one function and
chaining functions will be a pain. I don't think that emitting a datastream
& windows / trigger on EOF before every process function is a good idea

However I would like to work in a bounded way once I received all of my
elements (after the trigger on global window), like the dataset API, as I
will join on my whole dataset..

I thought maybe it would be a good idea to go for table API and group
window ? but you can not have custom trigger and a global group window on a
table ?(like the global window on datastream ?)
Best alternative would be to create a dataset as a result of my process
window function.. but I don't think this is possible, is it ?

Best Regards,
Bastien


Re: JobManager container is running beyond physical memory limits

2018-09-25 Thread Yun Tang
Hi

If your JM's container is killed by YARN due to beyond physical memory limit 
and your job's code is not changed but just bumped the Flink verion , I think 
you could use jmap command to dump the memory of your JobManager to see the 
difference between 1.4.2 and 1.5.2, and you could also open the GC log to see 
the difference.

In my experience, "jobmanager.execution.attempts-history-size" is configured 
too large, operator's name to too long for metrics stored in JM or even not 
configured checkpoint path (always return ByteStreamStateHandle back to JM) 
would impact the JM's memory footprint.

Best
Yun

From: eSKa 
Sent: Tuesday, September 25, 2018 14:45
To: user@flink.apache.org
Subject: Re: JobManager container is running beyond physical memory limits

anyone?



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


"405 HTTP method POST is not supported by this URL" is returned when POST a REST url on Flink on yarn

2018-09-25 Thread 徐涛
Hi All,
I am trying to POST a RESTful url and want to generate a savepoint, the 
Flink version is 1.6.0.
When I executed the POST in local, everything is OK, but when I POST 
the url on a Flink on YARN application. The following error is returned:
“405 HTTP method POST is not supported by this URL”, I guess it is 
caused by YARN limitation. 
(http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/REST-API-quot-broken-quot-on-YARN-because-POST-is-not-allowed-via-YARN-proxy-td19329.html
 
)
But does it have a workaround now?

Best
Henry

Re: When should the RETAIN_ON_CANCELLATION option be used?

2018-09-25 Thread vino yang
Hi Henry,

Your understanding is correct. Checkpoint itself is for recovery purposes.
If you cancel a job, Flink thinks it doesn't make sense to save the
checkpoint again. If you want to recover after cancel, then you should use
cancel with savepoint. So, by default, you don't need to manually clean up
checkpoint metadata unless you plan to use externalized checkpoints.

Thanks, vino.

徐涛  于2018年9月25日周二 下午2:59写道:

>  Hi Vino,
> So I will use the default setting of DELETE_ON_CANCELLATION. When the
> program cancels the checkpoint will be deleted, when the program
> fails,because the checkpoint will not be deleted, I still can have a
> checkpoint that can be used to resume.
> Please help to correct me if I am wrong.
>
> Thanks.
>
> Best
> Henry
>
> 在 2018年9月25日,下午2:22,vino yang  写道:
>
> Hi Henry,
>
> I gave a blue comment in your original email.
>
> Thanks, vino.
>
> 徐涛  于2018年9月25日周二 下午12:56写道:
>
>> Hi Vino,
>> *What is the definition and difference between job cancel and job fails?*
>> Can I say that if the program is shutdown artificially, then it is a job
>> cancel,
>>if the program is shutdown due to some error, it
>> is a job fail?
>>
>>
> This is not entirely true, and artificially triggering a cancel may also
> lead to failure. You can think that if the human triggers the cancel, each
> task instance can be correctly canceled, then the final job's status is
> canceled. The final state of the job due to various anomalies is failed.
>
>
>> This is important because it is the prerequisite for the following
>> question:
>>
>> In the document of Flink 1.6, it says:
>> * "ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION: Retain the
>> checkpoint when the job is cancelled. Note that you have to manually clean
>> up the checkpoint state after cancellation in this case.   *
>> *ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION: Delete the
>> checkpoint when the job is cancelled. The checkpoint state will only be
>> available if the job fails."*
>> But it does not says whether the checkpoint will be retained on fail.
>> If the checkpoint activity of fail is the same as cancel, then I have to
>> use RETAIL_ON_CANCELLATION, because if I do not use it, the checkpoint will
>> be deleted on job fail.
>> If the checkpoint activity of fail is not delete, then at this case it is
>> safe on job fail.
>>
>
> In the configuration, there are two enumeration classes
> `CheckpointRetentionPolicy` and `ExternalizedCheckpointCleanup`, you need
> to consider which configuration you want to use. Your main concern is
> ExternalizedCheckpointCleanup, which cleans up the metadata for
> externalized checkpoints. Are you sure you want to use it? Flink defaults
> to self-management checkpoint cleanup, which is a non-externalized
> checkpoint.
>
>
>> Best
>> Henry
>>
>>
>> 在 2018年9月25日,上午11:16,vino yang  写道:
>>
>> Hi Henry,
>>
>> Answer your question:
>>
>> What is the definition and difference between job cancel and job fails?
>>
>> > The cancellation and failure of the job will cause the job to enter the
>> termination state. But cancellation is artificially triggered and normally
>> terminated, while failure is usually a passive termination due to an
>> exception.
>>
>> If I use DELETE_ON_CANCELLATION option, in this case, does I have the
>> checkpoint to resume the program?
>>
>> > No, if you use externalized checkpoints. you cannot resume from
>> externalized checkpoints after the job has been cancelled.
>>
>> I mean if I can guarantee that a savepoint can always be made before
>> manually cancelation. If I use DELETE_ON_CANCELLATION option on
>> checkpoints, is there any probability that I do not have a checkpoint to
>> recover from?
>>
>> > From the latest source code, savepoint is not affected by
>> CheckpointRetentionPolicy, it needs to be cleaned up manually.
>>
>> Thanks, vino.
>>
>> 徐涛  于2018年9月25日周二 上午11:06写道:
>>
>>> Hi All,
>>> I mean if I can guarantee that a savepoint can always be made before
>>> manually cancelation. If I use DELETE_ON_CANCELLATION option on
>>> checkpoints, is there any probability that I do not have a checkpoint to
>>> recover from?
>>> Thank a a lot.
>>>
>>> Best
>>> Henry
>>>
>>>
>>>
>>> 在 2018年9月25日,上午10:41,徐涛  写道:
>>>
>>> Hi All,
>>> In flink document, it says
>>> DELETE_ON_CANCELLATION: “Delete the checkpoint when the job is
>>> cancelled. The checkpoint state will only be available if the job fails.”
>>> What is the definition and difference between job cancel and job
>>> fails? If I run the program on yarn, and after a few days, the yarn
>>> application get failed for some reason.
>>> If I use DELETE_ON_CANCELLATION option, in this case, does I have the
>>> checkpoint to resume the program?
>>>
>>> If the checkpoint are *only* deleted when I cancel the program, I can
>>> always make the savepoint before cancelation. Then it seems that I can
>>> *only* set DELETE_ON_CANCELLATION then.
>>> I can not find a case that RETAIN_ON_CANCELLATION should be used.