Re: Serialization schema

2017-02-23 Thread Mohit Anchlia
But it is not an inner class. On Thu, Feb 23, 2017 at 11:09 PM, Tzu-Li (Gordon) Tai wrote: > Since I don’t have your complete code, I’m guessing this is the problem: > Is your `Tuple2Serializer` an inner class? If yes, you should be able to > solve the problem by declaring

Re: Serialization schema

2017-02-23 Thread Mohit Anchlia
This is at high level what I am doing: Serialize: String s = tuple.getPos(0) + "," + tuple.getPos(1); return s.getBytes() Deserialize: String s = new String(message); String [] sarr = s.split(","); Tuple2 tuple = new Tuple2<>(Integer.valueOf(sarr[0]),

Re: Serialization schema

2017-02-23 Thread Tzu-Li (Gordon) Tai
Hi Mohit, As 刘彪 pointed out in his reply, the problem is that your `Tuple2Serializer` contains fields that are not serializable, so `Tuple2Serializer` itself is not serializable. Could you perhaps share your `Tuple2Serializer` implementation with us so we can pinpoint the problem? A snippet

Re: Serialization schema

2017-02-23 Thread Mohit Anchlia
I am using String inside to convert into bytes. On Thu, Feb 23, 2017 at 6:50 PM, 刘彪 wrote: > Hi Mohit > As you did not give the whole codes of Tuple2Serializerr. I guess the > reason is some fields of Tuple2Serializerr do not implement Serializable. > > 2017-02-24 9:07

Re: Serialization schema

2017-02-23 Thread 刘彪
Hi Mohit As you did not give the whole codes of Tuple2Serializerr. I guess the reason is some fields of Tuple2Serializerr do not implement Serializable. 2017-02-24 9:07 GMT+08:00 Mohit Anchlia : > I wrote a key serialization class to write to kafka however I am getting >

Re: Writing Tuple2 to a sink

2017-02-23 Thread 刘彪
Currently, OutputFormat is used for DataSet, SinkFunction is used for DataStream. Maybe I misunderstand your problem. That will be better if you give more details. 2017-02-24 5:21 GMT+08:00 Mohit Anchlia : > This works for Kafka but for the other types of sink am I

Serialization schema

2017-02-23 Thread Mohit Anchlia
I wrote a key serialization class to write to kafka however I am getting this error. Not sure why as I've already implemented the interfaces. Caused by: java.io.NotSerializableException: com.sy.flink.test.Tuple2Serializerr$1 at

Re: Writing Tuple2 to a sink

2017-02-23 Thread Mohit Anchlia
This works for Kafka but for the other types of sink am I supposed to use some type of outputformat? On Tue, Feb 21, 2017 at 7:13 PM, 刘彪 wrote: > Hi > I think there is a good way in FlinkKafkaProducerBase.java to deal with > this situation. There is a

Re: Frontend classpath issue

2017-02-23 Thread Robert Metzger
Mh. The user jar is put into every classpath. So the jobmanager / taskmanagers are potentially affected by this as well. Probably the data transfer between the TMs doesn't call the same methods as the UI on the JobManager :) The simplest solution is to shade your netty in the user jar into a

Re: Difference between partition and groupBy

2017-02-23 Thread Fabian Hueske
Hi Patrick, as Robert said, partitionBy() shuffles the data such that all records with the same key end up in the same partition. That's all it does. groupBy() also prepares the data in each partition to be processed per key. For example, if you run a groupReduce after a groupBy(), the data is

Re: Performance tuning

2017-02-23 Thread Robert Metzger
Hi Dmitry, Cool! Looks like you've taken the right approach to analyze the performance issues! Often the deserialization of the input is already a performance killer :) What is this one operator that is the bottleneck doing? Does it have a lot of state? Is it CPU intensive, or talking to an

Re: Difference between partition and groupBy

2017-02-23 Thread Robert Metzger
Hi Patrick, I think (but I'm not 100% sure) its not a difference in what the engine does in the end, its more of an API thing. When you are grouping, you can perform operations such as reducing afterwards. On a partitioned dataset, you can do stuff like processing each partition in parallel, or

Re: Apache Flink 1.1.4 - Gelly - LocalClusteringCoefficient - Returning values above 1?

2017-02-23 Thread Greg Hogan
Miguel and Vasia, My thought is to change the example drivers to "print" verbose strings to the console, for example: Vertex ID: 0, vertex degree: 42, triangle count: 7, local clustering coefficient: 0.00406504 Output to CSV will still be the compact tuple representations which do not include

Re: Flink not reading from Kafka

2017-02-23 Thread Debasish Ghosh
May be .. I will try to log in to the machine directly and see .. regards. On Fri, Feb 24, 2017 at 2:05 AM, Robert Metzger wrote: > Hi, > > It is possible that the stdout file is not properly available in the > taskmanager UI. > I guess if you log into the machine directly

Re: Frontend classpath issue

2017-02-23 Thread Robert Metzger
Hi, Since Flink 1.2 "per job yarn applications" (when you do "-m yarn-cluster") include the job jar into the classpath as well. Does this change explain the behavior? On Thu, Feb 23, 2017 at 4:59 PM, Gyula Fóra wrote: > Hi, > > I have a problem that the frontend somehow seems

Re: Flink not reading from Kafka

2017-02-23 Thread Robert Metzger
Hi, It is possible that the stdout file is not properly available in the taskmanager UI. I guess if you log into the machine directly to get the stout file, you'll find the output. On Thu, Feb 23, 2017 at 9:24 PM, Debasish Ghosh wrote: > Yes .. I was running Flink on

Re: Flink checkpointing gets stuck

2017-02-23 Thread Robert Metzger
Hi Shai, I think we don't have so many users running Flink on Azure. Maybe you are the first to put some heavy load onto that infrastructure using Flink. I would guess that your problems are caused by the same root cause, just the way the job is being cancelled is a bit different based on what is

Re: Flink not reading from Kafka

2017-02-23 Thread Debasish Ghosh
Yes .. I was running Flink on a DC/OS cluster. AFAIR I checked the taskmanager log from the Flink UI in Mesos. It said stdout was not available. But this may be due to the fact that Flink on DC/OS is not yet very stable .. regards. On Fri, Feb 24, 2017 at 1:41 AM, Robert Metzger

Re: Checkpointing with RocksDB as statebackend

2017-02-23 Thread Stephan Ewen
Hi Vinay! If you see that the memory usage is different when you checkpoint, it can be two things: (1) RocksDB needs to hold onto some snapshot state internally while the async background snapshot happens. That requires some memory. (2) There is often data buffered during the alignment of

Re: Flink not reading from Kafka

2017-02-23 Thread Robert Metzger
Hi Debashish, did you execute Flink in a distributed setting? print() will output the stream contents on stdout on the respective worker node (taskmanager), not on the machine that submitted the job. On Thu, Feb 23, 2017 at 5:41 PM, Debasish Ghosh wrote: > I was

Re: Cross operation on two huge datasets

2017-02-23 Thread Jain, Ankit
Hi Gwen, I would recommend looking into a data structure called RTree that is designed specifically for this use case, i.e matching point to a region. Thanks Ankit From: Fabian Hueske Date: Wednesday, February 22, 2017 at 2:41 PM To: Subject: Re:

Re: Performance tuning

2017-02-23 Thread Dmitry Golubets
Hi Robert, In dev environment I load data via zipped csv files from s3. Data is parsed in a case classes. It's quite fast, I'm able to get ~80k/sec with only source and "dev/null" sink. Checkpointing is enabled with 1 hour intervals. Yes, one of the operators is a bottleneck and it

Re: Checkpointing with RocksDB as statebackend

2017-02-23 Thread vinay patil
Hi, When I disabled checkpointing the memory usage is similar for all nodes, this means that for checkpointing enabled case the data is first flushed to memory of CORE nodes (DataNode daemon is running here in case of EMR ) . I am going to run with FSStatebackend on a high end cluster with

Difference between partition and groupBy

2017-02-23 Thread Patrick Brunmayr
What is the basic difference between partitioning datasets by key or grouping them by key ? Does it make a difference in terms of paralellism ? Thx

Re: Flink not reading from Kafka

2017-02-23 Thread Debasish Ghosh
I was facing a similar problem yesterday. In my case print() was not working. Try adding a Sink and write the output to another Kafka topic. Something like

Re: Performance tuning

2017-02-23 Thread Robert Metzger
Hi Dmitry, sorry for the late response. Where are you reading the data from? Did you check if one operator is causing backpressure? Are you using checkpointing? Serialization is often the cause for slow processing. However, its very hard to diagnose potential other causes without any details on

Re: Flink not reading from Kafka

2017-02-23 Thread Robert Metzger
Hi Mohit, is there new data being produced into the topic? The properties.setProperty("auto.offset.reset", "earliest"); setting only applies if you haven't consumed anything in this consumer group. So if you have read all the data in the topic before, you won't see anything new showing up. On

Re: How to achieve exactly once on node failure using Kafka

2017-02-23 Thread Robert Metzger
Hi, exactly. You have to make sure that you can write data for the same ID multiple times. Exactly once in Flink is only guaranteed for registered state. So if you have a flatMap() with a "counter" variable, that is held in a "ValueState", this counter will always be in sync with the number of

Re: flink on yarn ha

2017-02-23 Thread Robert Metzger
Hi, This looks like a shading issue. Can you post the classpath the JobManager / AppMaster is logging on startup on the mailing list? If seems that Hadoop loads an unshaded version of the SecurityProtos. Maybe there is some hadoop version mixup. Are you using a Hadoop distribution (like CDH or

Frontend classpath issue

2017-02-23 Thread Gyula Fóra
Hi, I have a problem that the frontend somehow seems to have the user jar on the classpath and it leads to a netty conflict: https://gist.github.com/gyfora/4ec2c8a8a6b33adb80d411460432ce8d So in the jobmanager logs I can see that my job started (running on YARN), but can't access the frontend,

Re: [test][ignore] Sending an email to user@flink without being subscribed ...

2017-02-23 Thread Robert Metzger
Please ignore these messages. I'll talk to the ASF infra how we can resolve the issue. On Thu, Feb 23, 2017 at 4:54 PM, Robert Metzger wrote: > I'm testing what happens if I'm sending an email to the user@flink list > without being subscribed. > > On the dev@ list, moderators

[test][ignore] Sending an email to user@flink without being subscribed ...

2017-02-23 Thread Robert Metzger
I'm testing what happens if I'm sending an email to the user@flink list without being subscribed. On the dev@ list, moderators get an email in that case. I have the suspicion that you can post on the user@ list without subscribing first. We have often users that ask a question, we give an initial

List State in RichWindowFunction leads to RocksDb memory leak

2017-02-23 Thread Seth Wiesman
I am working on a program that uses a complex window and have run into some issues. It is a 1 hour window with 7 days allowed lateness including a custom trigger that gives us intermediate results every 5 minutes of processing time until the end of 7 days event time when a final fire is

Re: Checkpointing with RocksDB as statebackend

2017-02-23 Thread vinay patil
Hi Stephan, Anyways the Async exception is gone. I have increased my instance type to r3.2xlarge having 60GB of memory. BUt what I have observed here is that for two task managers the memory usage is close to 30GB but for other two it goes up to 55GB, the load is equally distributed among all

Compilation Error in WindowStream.fold()

2017-02-23 Thread nsengupta
For reasons I cannot grasp, I am unable to move ahead. Here's the code: - import org.apache.flink.api.common.functions.FoldFunction import

Re: Cross operation on two huge datasets

2017-02-23 Thread Xingcan Cui
Hi, @Gwen, sorry that I missed the cross function and showed you the wrong way. @Fabian's answers are what I mean. Considering that the cross function is so expensive, can we find a way to restrict the broadcast. That is, if the groupBy function is a many-to-one mapping, the cross function is an

Re: Flink jdbc

2017-02-23 Thread Punit Tandel
HI, Yes i have written custom jdbc sink function based on the jdbcoutformat for streaming and its working and writing records in postgres db or H2 in memory db. However trying to figure out how many times open method is called and establishes database connection because for my integration

Re: Cross operation on two huge datasets

2017-02-23 Thread Fabian Hueske
Hi Gwen, sorry I didn't read your answer, I was still writing mine when you sent yours ;-) Regarding your strategy, this is basically what Cross does: It keeps on input partitioned and broadcasts (replicates) the other one. On each partition, it combines the records of the partition of the first

Re: Cross operation on two huge datasets

2017-02-23 Thread Fabian Hueske
Hi, Flink's batch DataSet API does already support (manual) theta-joins via the CrossFunction. It combines each pair of records of two input data sets. This is done by broadcasting (and hence replicating) one of the inputs. @Xingcan, so I think what you describe is already there. However, as I

RE: Cross operation on two huge datasets

2017-02-23 Thread Gwenhael Pasquiers
Hi and thanks for your answers ! I’m not sure I can define any index to split the workload since in my case any point could be in any zone... I think I’m currently trying to do it the way you call “theta-join”: 1- Trying to split one dataset over the cluster and prepare it for work

RE: Flink checkpointing gets stuck

2017-02-23 Thread Shai Kaplan
And now it's happening again -Original Message- From: Shai Kaplan [mailto:shai.kap...@microsoft.com] Sent: Wednesday, February 22, 2017 12:02 PM To: user@flink.apache.org Subject: RE: Flink checkpointing gets stuck I changed the checkpoint interval to 30 minutes, and also switched