SQL Client (Streaming + Checkpoint)
It looks like the SQL client does not configure enable checkpoint while submitting the streaming job query. Did anyone notice this behavior? FYI, I am using 1.6.x branch. RegardsVijay
Re: Support for multiple HDFS
Hi Ted, I believe HDFS-6584 is more of an HDFS feature supporting archive use case through some policy configurations. My ask is that I have two distinct HCFS File systems which are independent but the Flink job will decide which one to use for sink while the Flink infrastructure is by default configured with one of these HCFS as state backend store. Hope this helps. RegardsVijay On Wednesday, August 23, 2017 11:06 AM, Ted Yu <yuzhih...@gmail.com> wrote: Would HDFS-6584 help with your use case ? On Wed, Aug 23, 2017 at 11:00 AM, Vijay Srinivasaraghavan < vijikar...@yahoo.com.invalid> wrote: > Hello, > Is it possible for a Flink cluster to use multiple HDFS repository (HDFS-1 > for managing Flink state backend, HDFS-2 for syncing results from user > job)? > The scenario can be viewed in the context of running some jobs that are > meant to push the results to an archive repository (cold storage). > Since the hadoop configuration is static, I am thinking it is hard to > achieve this but I could be wrong. > Please share any thoughts. > RegardsVijay
Support for multiple HDFS
Hello, Is it possible for a Flink cluster to use multiple HDFS repository (HDFS-1 for managing Flink state backend, HDFS-2 for syncing results from user job)? The scenario can be viewed in the context of running some jobs that are meant to push the results to an archive repository (cold storage). Since the hadoop configuration is static, I am thinking it is hard to achieve this but I could be wrong. Please share any thoughts. RegardsVijay
State Backend
Hello, I would like to know if we have any latency requirements for choosing appropriate state backend? For example, if an HCFS implementation is used as Flink state backend (instead of stock HDFS), are there any implications that one needs to know with respect to the performance? - Frequency of read/write operations, random vs sequential reads- Load/Usage pattern (Frequent small updates vs bulk operation)- RocksDB->HCFS (Is this kind of recommended option to mitigate some of the challenges outlined above)- S3 Vs HDFS any performance numbers? Appreciate any inputs on this. RegardsVijay
ElasticsearchSink Serialization Error
Hello, I am seeing below error when I try to use ElasticsearchSink. It complains about serialization and looks like it is leading to "IndexRequestBuilder" implementation. I have tried the suggestion as mentioned in http://stackoverflow.com/questions/33246864/elasticsearch-sink-seralizability (changed from anonymous class to concrete class) but it did not help. However, when I call "ElasticsearchSink<>(config, transports, null)" by passing "null" for "IndexRequestBuilder" then I don't see the serialization error. This suggests the problem could be with the IndexRequestBuilder implementation but I am not able to move further. Could someone please let me know what's the right way to use ElasticsearchSink() API? Build DetailsFlink 1.2.0Elastic Search 5.3.0 Error Message org.apache.flink.api.common.InvalidProgramException: The implementation of the RichSinkFunction is not serializable. The object probably contains or references non serializable fields. at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:100) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1539) at org.apache.flink.streaming.api.datastream.DataStream.clean(DataStream.java:161) at org.apache.flink.streaming.api.datastream.DataStream.addSink(DataStream.java:1076) Code Snippet ``` private ElasticsearchSink sinkToElasticSearch(AppConfiguration appConfiguration) throws Exception { String host = appConfiguration.getPipeline().getElasticSearch().getHost(); int port = appConfiguration.getPipeline().getElasticSearch().getPort(); String cluster = appConfiguration.getPipeline().getElasticSearch().getCluster(); Mapconfig = new HashMap<>(); config.put("bulk.flush.max.actions", "1"); config.put("cluster.name", cluster); List transports = new ArrayList<>(); transports.add(new InetSocketTransportAddress(host, port)); return new ElasticsearchSink<>(config, transports, new ResultIndexRequestBuilder(appConfiguration)); } public class ResultIndexRequestBuilder implements IndexRequestBuilder, Serializable { private String index; private String type; //private transient Gson gson = new Gson(); public ResultIndexRequestBuilder() {} public ResultIndexRequestBuilder(AppConfiguration appConfiguration) { index = appConfiguration.getPipeline().getElasticSearch().getIndex(); type = appConfiguration.getPipeline().getElasticSearch().getType(); } @Override public IndexRequest createIndexRequest(Result result, RuntimeContext ctx) { Gson gson = new Gson(); String resultAsJson = gson.toJson(result); System.out.println(resultAsJson); Map jsonMap = new HashMap<>(); jsonMap.put("data", resultAsJson); return Requests.indexRequest() .index(index) .type(type) .source(jsonMap); }``` RegardsVijay
Re: Reliable Distributed FS support (HCFS)
Following up on my question regarding backed Filesystem (HCFS) requirements. Appreciate any inputs. ---Regarding the Filesystem abstraction support, we are planning to use a distributed file system which complies with Hadoop Compatible File System (HCFS) standard in place of standard HDFS. According to the documentation (https://ci.apache.org/projects/flink/flink-docs-release-1.3/internals/filesystems.html), persistence gurantees is listed as one of the main requirement and to be precises it qualifies both visibility and durability gurantees. My question is, 1) Are we expecting the file system to support "Atomic Rename" characteristics? I believe checkpoint mechanism involves in renaming the files and will that have an impact if "atomic rename" is not guranteed by the underlying file system? 2) How does one certify Flink with HCFS (in place of standard HDFS) in terms of the scenarios/usecase that needs to be tested? Is there any general guidance on this?--- RegardsVijay On Wednesday, February 15, 2017 11:28 AM, Vijay Srinivasaraghavan <vijikar...@yahoo.com> wrote: Hello, Regarding the Filesystem abstraction support, we are planning to use a distributed file system which complies with Hadoop Compatible File System (HCFS) standard in place of standard HDFS. According to the documentation (https://ci.apache.org/projects/flink/flink-docs-release-1.3/internals/filesystems.html), persistence gurantees is listed as one of the main requirement and to be precises it qualifies both visibility and durability gurantees. My question is, 1) Are we expecting the file system to support "Atomic Rename" characteristics? I believe checkpoint mechanism involves in renaming the files and will that have an impact if "atomic rename" is not guranteed by the underlying file system? 2) How does one certify Flink with HCFS (in place of standard HDFS) in terms of the scenarios/usecase that needs to be tested? Is there any general guidance on this? ThanksVijay
Reliable Distributed FS support (HCFS)
Hello, Regarding the Filesystem abstraction support, we are planning to use a distributed file system which complies with Hadoop Compatible File System (HCFS) standard in place of standard HDFS. According to the documentation (https://ci.apache.org/projects/flink/flink-docs-release-1.3/internals/filesystems.html), persistence gurantees is listed as one of the main requirement and to be precises it qualifies both visibility and durability gurantees. My question is, 1) Are we expecting the file system to support "Atomic Rename" characteristics? I believe checkpoint mechanism involves in renaming the files and will that have an impact if "atomic rename" is not guranteed by the underlying file system? 2) How does one certify Flink with HCFS (in place of standard HDFS) in terms of the scenarios/usecase that needs to be tested? Is there any general guidance on this? ThanksVijay
Re: Data Transfer between TM should be encrypted
Hi Vinay, There are some delays and we expect the PR to be created next week. RegardsVijay On Wednesday, September 14, 2016 5:41 PM, vinay patil <vinay18.pa...@gmail.com> wrote: Hi Vijay, Did you raise the PR for this task, I don't mind testing it out as well. Regards,Vinay Patil On Tue, Aug 30, 2016 at 6:28 PM, Vinay Patil <[hidden email]> wrote: Hi Vijay, That's a good news for me. Eagerly waiting for this change so that I can integrate and test it before going live. Regards,Vinay Patil On Tue, Aug 30, 2016 at 4:06 PM, Vijay Srinivasaraghavan [via Apache Flink User Mailing List archive.] <[hidden email]> wrote: Hi Stephan, The dev work is almost complete except the Yarn mode deployment stuff that needs to be patched. We are expecting to send a PR in a week or two. RegardsVijay On Tuesday, August 30, 2016 12:39 AM, Stephan Ewen <[hidden email]> wrote: Let me loop in Vijay, I think he is the one working on this and can probably give the best estimate when it can be expected. @vijay: For the SSL/TLS transport encryption - do you have an estimate for the timeline of that feature? On Mon, Aug 29, 2016 at 8:54 PM, vinay patil <[hidden email]> wrote: Hi Stephan, Thank you for your reply. Till when can I expect this feature to be integrated in master or release version ? We are going to get production data (financial data) in October end , so want to have this feature before that. Regards,Vinay Patil On Mon, Aug 29, 2016 at 11:15 AM, Stephan Ewen [via Apache Flink User Mailing List archive.] <[hidden email]> wrote: Hi! The way that the JIRA issue you linked will achieve this is by hooking into the network stream pipeline directly, and encrypt the raw network byte stream. We built the network stack on Netty, and will use Netty's SSL/TLS handlers for that. That should be much more efficient than manual encryption/decryption in each user function. Stephan On Mon, Aug 29, 2016 at 6:12 PM, vinay patil <[hidden email]> wrote: Hi Ufuk, This is regarding this issuehttps://issues.apache.org/jira /browse/FLINK-4404 How can we achieve this, I am able to decrypt the data from Kafka coming in, but I want to make sure that the data is encrypted when flowing between TM's. One approach I can think of is to decrypt the data at the start of each operator and encrypt it at the end of each operator, but I feel this is not an efficient approach. I just want to check if there are alternatives to this and can this be achieved by doing some configurations. Regards,Vinay Patil View this message in context: Data Transfer between TM should be encrypted Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com. If you reply to this email, your message will be added to the discussion below: http://apache-flink-user-maili ng-list-archive.2336050.n4. nabble.com/Data-Transfer-betwe en-TM-should-be-encrypted- tp8781p8782.html To start a new topic under Apache Flink User Mailing List archive., email [hidden email] To unsubscribe from Apache Flink User Mailing List archive., click here. NAML View this message in context: Re: Data Transfer between TM should be encrypted Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com. If you reply to this email, your message will be added to the discussion below: http://apache-flink-user-maili ng-list-archive.2336050.n4. nabble.com/Data-Transfer-betwe en-TM-should-be-encrypted- tp8781p8801.html To start a new topic under Apache Flink User Mailing List archive., email [hidden email] To unsubscribe from Apache Flink User Mailing List archive., click here. NAML View this message in context: Re: Data Transfer between TM should be encrypted Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.
Re: Data Transfer between TM should be encrypted
Hi Stephan, The dev work is almost complete except the Yarn mode deployment stuff that needs to be patched. We are expecting to send a PR in a week or two. RegardsVijay On Tuesday, August 30, 2016 12:39 AM, Stephan Ewenwrote: Let me loop in Vijay, I think he is the one working on this and can probably give the best estimate when it can be expected. @vijay: For the SSL/TLS transport encryption - do you have an estimate for the timeline of that feature? On Mon, Aug 29, 2016 at 8:54 PM, vinay patil wrote: Hi Stephan, Thank you for your reply. Till when can I expect this feature to be integrated in master or release version ? We are going to get production data (financial data) in October end , so want to have this feature before that. Regards,Vinay Patil On Mon, Aug 29, 2016 at 11:15 AM, Stephan Ewen [via Apache Flink User Mailing List archive.] <[hidden email]> wrote: Hi! The way that the JIRA issue you linked will achieve this is by hooking into the network stream pipeline directly, and encrypt the raw network byte stream. We built the network stack on Netty, and will use Netty's SSL/TLS handlers for that. That should be much more efficient than manual encryption/decryption in each user function. Stephan On Mon, Aug 29, 2016 at 6:12 PM, vinay patil <[hidden email]> wrote: Hi Ufuk, This is regarding this issuehttps://issues.apache.org/jira /browse/FLINK-4404 How can we achieve this, I am able to decrypt the data from Kafka coming in, but I want to make sure that the data is encrypted when flowing between TM's. One approach I can think of is to decrypt the data at the start of each operator and encrypt it at the end of each operator, but I feel this is not an efficient approach. I just want to check if there are alternatives to this and can this be achieved by doing some configurations. Regards,Vinay Patil View this message in context: Data Transfer between TM should be encrypted Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com. If you reply to this email, your message will be added to the discussion below: http://apache-flink-user-maili ng-list-archive.2336050.n4. nabble.com/Data-Transfer-betwe en-TM-should-be-encrypted- tp8781p8782.html To start a new topic under Apache Flink User Mailing List archive., email [hidden email] To unsubscribe from Apache Flink User Mailing List archive., click here. NAML View this message in context: Re: Data Transfer between TM should be encrypted Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.
Re: Flink WebUI on YARN behind firewall
Hi Trevor, I am seeing similar issue for a JIRA that I am working now. I am yet to trace the Yarn Web UI code to find out how the "tracking URL" is being handled. To ublock, you could use the tracking URL (Flink UI URL) directly to access Flink Web UI to by-pass Yarn UI redirection. You can find the tracking URI in the Job Manager log file from Yarn container. RegardsVijay On Friday, August 26, 2016 10:52 AM, Trevor Grantwrote: I decided it made the most sense to open up a new thread. I am running Flink on a cluster behind a firewall. Things seem to be working fine, but when I access the YARN web-ui and click on the flink application-ui, i get the jobmanager ui, but it is broken. It is a broken link to a flink image and Apache Flink Dashboard - Overview - Running Jobs - Completed Jobs - Task Managers - Job Manager - Submit new Job but none of the links work, no images no pretty formatting. does anyone have a quick idea why this would be? I found http://mail-archives.apache.org/mod_mbox/flink-user/201511.mbox/%3CCAGr9p8B_aKhbjLqVQQUZeO_eSz6P=ewkp+kg1sq65sb0nps...@mail.gmail.com%3E but that seems to be more related to running jobs than accessing the UI any help would be appreciated tg Trevor GrantData Scientisthttps://github.com/rawkintrevo http://stackexchange.com/users/3002022/rawkintrevo http://trevorgrant.org "Fortunate is he, who is able to know the causes of things." -Virgil
Re: Checkpoint
Thanks Ufuk and Stephan. I have added Identity mapper and disabled chaining. With that, I am able to see the backpressue alert on the identify mapper task. I have noticed one thing that when I introduced delay (sleep) on the subsequent task, sometimes checkpoint is not working. I could see checkpoint trigger but the files are not moved from "pending" state. I will try to reproduce to find the pattern but are you aware of any such scenario? RegardsVijay On Thursday, March 10, 2016 2:51 AM, Stephan Ewen <se...@apache.org> wrote: Just to be sure: Is the task whose backpressure you want to monitor the Kafka Source? There is an open issue that backpressure monitoring does not work for the Kafka Source: https://issues.apache.org/jira/browse/FLINK-3456 To circumvent that, add an "IdentityMapper" after the Kafka source, make sure it is non-chained, and monitor the backpressure on that MapFunction. Greetings,Stephan On Thu, Mar 10, 2016 at 11:23 AM, Robert Metzger <rmetz...@apache.org> wrote: Hi Vijay, regarding your other questions: 1) On the TaskManagers, the FlinkKafkaConsumers will write the partitions they are going to read in the log. There is currently no way of seeing the state of a checkpoint in Flink (which is the offsets). However, once a checkpoint is completed, the Kafka consumer is committing the offset to the Kafka broker. (I could not find tool to get the committed offsets from the broker, but its either stored in ZK or in a special topic by the broker. In Kafka 0.8 that's easily doable with the kafka.tools.ConsumerOffsetChecker) 2) Do you see duplicate data written by the rolling file sink? Or do you see it somewhere else?HDP 2.4 is using Hadoop 2.7.1 so the truncate() of invalid data should actually work properly. On Thu, Mar 10, 2016 at 10:44 AM, Ufuk Celebi <u...@apache.org> wrote: How many vertices does the web interface show and what parallelism are you running? If the sleeping operator is chained you will not see anything. If your goal is to just see some back pressure warning, you can call env.disableOperatorChaining() and re-run the program. Does this work? – Ufuk On Thu, Mar 10, 2016 at 1:35 AM, Vijay Srinivasaraghavan <vijikar...@yahoo.com> wrote: > Hi Ufuk, > > I have increased the sampling size to 1000 and decreased the refresh > interval by half. In my Kafka topic I have pumped million messages which is > read by KafkaConsumer pipeline and then pass it to a transofmation step > where I have introduced sleep (3 sec) for every single message received and > the final step is HDFS sink using RollingSinc API. > > jobmanager.web.backpressure.num-samples: 1000 > jobmanager.web.backpressure.refresh-interval: 3 > > > I was hoping to see the backpressure tab from UI to display some warning but > I still see "OK" message. > > This makes me wonder if I am testing the backpressure scenario properly or > not? > > Regards > Vijay > > On Monday, March 7, 2016 3:19 PM, Ufuk Celebi <u...@apache.org> wrote: > > > Hey Vijay! > > On Mon, Mar 7, 2016 at 8:42 PM, Vijay Srinivasaraghavan > <vijikar...@yahoo.com> wrote: >> 3) How can I simulate and verify backpressure? I have introduced some >> delay >> (Thread Sleep) in the job before the sink but the "backpressure" tab from >> UI >> does not show any indication of whether backpressure is working or not. > > If a task is slow, it is back pressuring upstream tasks, e.g. if your > transformations have the sleep, the sources should be back pressured. > It can happen that even with the sleep the tasks still produce their > data as fast as they can and hence no back pressure is indicated in > the web interface. You can increase the sleep to check this. > > The mechanism used to determine back pressure is based on sampling the > stack traces of running tasks. You can increase the number of samples > and/or decrease the delay between samples via config parameters shown > in [1]. It can happen that the samples miss the back pressure > indicators, but usually the defaults work fine. > > > [1] > https://ci.apache.org/projects/flink/flink-docs-master/setup/config.html#jobmanager-web-frontend > > >