Re: Flink operator UUID and serialVersionUID

2018-11-27 Thread bupt_ljy
Hi Jayant, If you change the “MyObject” class and influence the serialized value, then the “MyObject” instance can’t be deserialized when restoring, which causes a failure of restoring. You can just use the default serialVersionUID instead defining it explicitly(it makes no difference if you

Re: Flink operator UUID and serialVersionUID

2018-11-27 Thread Jayant Ameta
Thanks for clarifying Jiayi. If there is a change in "MyObject" class, would it help to have a serialVersionUID defined? Thanks, Jayant On Wed, Nov 28, 2018 at 12:52 PM bupt_ljy wrote: > Hi, Jayant > >1. The uuid is an unique identifier for a specific operator, which > means that Flink

Re: Flink operator UUID and serialVersionUID

2018-11-27 Thread bupt_ljy
Hi, Jayant 1. The uuid is an unique identifier for a specific operator, which means that Flink uses the uuid to recognize the operator when restoring. 2. The operator has already implemented the Serializable interface so you don’t need to do it explicitly. 3. The type information of

Flink operator UUID and serialVersionUID

2018-11-27 Thread Jayant Ameta
Hi all, I've a few questions regarding serial version: 1. The production ready checklist mentions using uuids for operators. How is it different from setting a serialVersionUID on an

Re: OutOfMemoryError while doing join operation in flink

2018-11-27 Thread Akshay Mendole
Hi Zhijiang, Thanks for the explanation and the workaround suggested. While this can work for the example stated above, we have more complex use cases where we would have to re-tune the above parameters. FYI, we ran into same problems when we did a simple groupBy on the skewed

flink hadoop 3 integration plan

2018-11-27 Thread Ming Zhang
Hi All, now we plan to move CDH6 which is based on hadoop3, anyone knows the plan of flink hadoop3 integration? thanks in advance. Ming.He

Memory does not be released after job cancellation

2018-11-27 Thread Nastaran Motavali
Hi, I have a simple java application uses flink 1.6.2. When I run the jar file, I can see that the job consumes a part of the host's main memory. If I cancel the job, the consumed memory does not be released until I stop the whole cluster. How can I release the memory after cancellation? I have

Checkpointing to gcs taking too long

2018-11-27 Thread prakhar_mathur
I am trying to run flink on kubernetes, and trying to push checkpoints to Google Cloud Storage. Below is the docker file `FROM flink:1.6.2-hadoop28-scala_2.11-alpine RUN wget -O lib/gcs-connector-latest-hadoop2.jar https://storage.googleapis.com/hadoop-lib/gcs/gcs-connector-latest-hadoop2.jar

MapState - TypeSerializer

2018-11-27 Thread Alexey Trenikhun
Hello, Flink documentation states that “TypeSerializers and TypeSerializerConfigSnapshots are written as part of checkpoints along with the state values”, in context of MapState, does it mean TypeSerializer per each MapState entry or only once per state? Alexey

Re: Store Predicate or any lambda in MapState

2018-11-27 Thread Andrey Zagrebin
Hi It can be just some dependency problem if this library, where the lambda is defined, is not on the class path of the job. On the other hand, we might want to investigate it because Flink uses some older version 2.24.0 of kryo. According to this issue [1], lambda support was added to Kryo

Re: Flink - Metric are not reported

2018-11-27 Thread Chesnay Schepler
Please enable WARN logging and check for warnings by the SLF4JReporter and/or MetricQueryService. On 27.11.2018 17:00, bastien dine wrote: Hello everyone, Once again I require your help ! I am trying to report custom metric (see my code below) Yet, I do not see them anywhere.. nor in the

Flink - Metric are not reported

2018-11-27 Thread bastien dine
Hello everyone, Once again I require your help ! I am trying to report custom metric (see my code below) Yet, I do not see them anywhere.. nor in the metric tab from my tasks, nor in the rest API, nor in the declared slf4j reporter.. Can someone help me to debug this .. Here is my RichMap

Re: kafka to hdfs flow avro schema evolution

2018-11-27 Thread Dawid Wysakowicz
Hi, Generally speaking you can handle reading multiple avro schema versions as long as you have access to all other versions. How it is usually done is with some schema registry service. Flink comes with utility ConfluentRegistryAvroDeserializationSchema that allows you to read binary encoded

Re: how to override s3 key config in flink job

2018-11-27 Thread Andrey Zagrebin
Hi Tony, File system factories are class-loaded in running JVMs of task executors. That is why their configured objects are shared by different Flink jobs. It is not possible to change their options per created file system and per job at the moment. This could be changed, e.g. for s3, by

Re: your advice please regarding state

2018-11-27 Thread Avi Levi
Thank you very much. got it. On Tue, Nov 27, 2018 at 12:53 PM Fabian Hueske wrote: > Hi Avi, > > I'd definitely go for approach #1. > Flink will hash partition the records across all nodes. This is basically > the same as a distributed key-value store sharding keys. > I would not try to fine

Re: Tentative release date for 1.6.3

2018-11-27 Thread Chesnay Schepler
The 1.7.0 release is currently ongoing and expected to be finished within the week. On 26.11.2018 18:36, Vishal Santoshi wrote: +1. Do not see 1.7 release details anywhere. On Mon, Nov 26, 2018, 8:02 AM galantaa wrote: Hi vino, We want to upgrade flink

Re: Flink Exception - assigned slot container was removed

2018-11-27 Thread Andrey Zagrebin
Hi It can be also that some task managers fail, then pending checkpoints are declined and slots are removed. Can you also have a look into task managers logs? Best, Andrey > On 26 Nov 2018, at 12:19, qi luo wrote: > > This is weird. Could you paste your entire exception trace here? > >> On

Re: your advice please regarding state

2018-11-27 Thread Fabian Hueske
Hi Avi, I'd definitely go for approach #1. Flink will hash partition the records across all nodes. This is basically the same as a distributed key-value store sharding keys. I would not try to fine tune the partitioning. You should try to use as many keys as possible to ensure an even

kafka to hdfs flow avro schema evolution

2018-11-27 Thread CPC
Hi everybody, We are planning to use flink for our kafka to hdfs ingestion. We are consuming avro messages encoded as json and then writing them to hdfs as parquet. But our avro schema is changing time to time in a backward compatible way. And because of deployments you can see messages like v1

Re: Counter Implementation in Flink

2018-11-27 Thread Chesnay Schepler
The default implementation is the SimpleCounter, however I believe that the implementation isn't of interest compared to how it is used. Please expand on the graph ( which metric is represented by what color ) and include all of your code that interacts with the your counter. On 24.11.2018

Re: how to override s3 key config in flink job

2018-11-27 Thread yinhua.dai
It might be difficult as you the task manager and job manager are pre-started in a session mode. It seems that flink http server will always use the configuration that you specified when you start your flink cluster, i.e. start-cluster.sh, I don't find a way to override it. -- Sent from:

Re:

2018-11-27 Thread Timo Walther
Hi Hengyu, currently, metadata between Flink programs can only be shared via code. For this, we recently introduced a programmatic descriptor-based way of specifying sources and sinks [1]. However, a catalog such as Hive metastore would be much easier and the community is currently working

[no subject]

2018-11-27 Thread Henry Dai
Hi, Is there a way to get table's metadata in flink? if I emit a table to kafka, then how can I know the table columns when I subscribe the kafka topic and restore the table using *tableEnv.registerDataStream("t1", source, "field1, field2 ...") *in another flink program? Flink should provide