Re: Yahoo! Streaming Benchmark with Flink
Thanks Till, your reply answered my questions perfectly. Regards, Eric On Fri, Oct 28, 2016 at 11:00 AM, Till Rohrmann wrote: > Hi Eric, > > concerning your first question. I think that > AdvertisingTopologyFlinkStateHighKeyCard > models a different scenario where one tries to count the number ads per > campaign for a large number of campaigns. In this scenario, the input data > already contains the campaign id for each ad. I think this is the job for > the paragraph "Winning Twitter Hack Week: Eliminating the key-value store > bottleneck". > > concerning your second question. The response actor is registered at the > registration service. The registration service exposes the akka URL of this > actor under the index of the running task. When you run AkkaStateQuery, the > registration is queried to retrieve the akka URL and then a query state > request is sent to the response actor via the QueryActor. That is how the > actor comes into play. > > At the moment the registration service is implemented using ZooKeeper. > This means that the akka URL is written to ZooKeeper from where it can be > retrieved. > > I hope this answers your questions. > > Cheers, > Till > > On Fri, Oct 28, 2016 at 2:47 AM, Eric Fukuda wrote: > >> Hi, >> >> I have two questions on the blog post on Yahoo! Streaming Benchmark with >> Flink [1]. >> >> First is about the join operation to associate ad_ids and campaign_ids. >> In flink.benchmark.state.AdvertisingTopologyFlinkStateHighKeyCard, I >> don't see this being done. Is there a reason for this? >> >> Second is about Akka actor. Reading >> flink.benchmark.state.QueryableWindowOperator >> or flink.benchmark.state.QueryableWindowOperatorEvicting, it looks like >> the Akka actor is being prepared but not used in the actual processing >> (processElement()). Is this correct? And how do I enable Akka in the job? >> >> [1] http://data-artisans.com/extending-the-yahoo-streaming-benchmark/ >> >> Thanks, >> Eric >> > >
Flink on YARN - Fault Tolerance | use case supported or not
Hi All, I tried testing fault tolerance in a different way(not sure if it as appropriate way) of my running flink application. I ran the flink application on YARN and after completing few checkpoints, killed the YARN application using: yarn application -kill application_1476277440022_ Further, tried restarting the application by providing the same path of the checkpointing directory. The application started afresh and did not resume from the last check-pointed state. Just wanted to make sure if fault tolerance in this usecase is valid or not. If yes, what am I doing wrong? I'm aware of the savepoint process- to create savepoint, stop the application and resume new application from the same savepoint but wished to check the above usecase considering the fact that for some reason if the YARN application gets killed perhaps accidentally or due to any other reason, is this kind of fault tolerance supported or not. Regards, Anchit
Flink Metrics - InfluxDB + Grafana | Help with query influxDB query for Grafana to plot 'numRecordsIn' & 'numRecordsOut' for each operator/operation
Hi All, I'm trying to plot the flink application metrics using grafana backed by influxdb. I need to plot/monitor the 'numRecordsIn' & 'numRecordsOut' for each operator/operation. I'm finding it hard to generate the influxdb query in grafana which can help me make this plot. I am able to plot the 'numRecordsIn' & 'numRecordsOut' for each subtask(parallelism set to 50) of the operator but not the operator as a whole. If somebody has knowledge or has successfully implemented this kind of a plot on grafana backed by influxdb, please share with me the process/query to achieve the same. Below is the query which I have to monitor the 'numRecordsIn' & 'numRecordsOut' for each subtask SELECT derivative(sum("count"), 10s) FROM "numRecordsOut" WHERE "task_name" = 'Source: Reading from Kafka' AND "subtask_index" =~ /^$subtask$/ AND $timeFilter GROUP BY time(10s), "task_name" PS: $subtask is the templating variable that I'm using in order to have multiple subtask values. I have tried the 'All' option for this templating variable- This give me an incorrect plot showing me negative values while the individual selection of subtask values when selected from the templating variable drop down yields correct result. Thank you! Regards, Anchit
Re: Session windows - Evaluation of addition of element + window expires/gets discarded after some set time of inactivity
Hi Aljoscha, I am using the custom trigger with GlobalWindows window assigner. Do I still need to override clear method and delete the ProcessingTimeTimer using- triggerContext.deleteProcessingTimeTimer(prevTime)? Regards, Anchit -- View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Session-windows-Evaluation-of-addition-of-element-window-expires-gets-discarded-after-some-set-time-y-tp9665p9774.html Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.
Elasticsearch sink: Java.lang.NoSuchMethodError: org.elasticsearch.common.settings.Settings.settingsBuilder
Hello, I am using Flink to write data to elasticsearch. Flink version : 1.1.3 Elasticsearch version: 2.4.1 But I am getting the following error: 1/0/28/2016 18:58:56 Job execution switched to status FAILING. java.lang.NoSuchMethodError: org.elasticsearch.common.settings.Settings.settingsBuilder()Lorg/elasticsearch/common/settings/Settings$Builder; at org.apache.flink.streaming.connectors.elasticsearch2.ElasticsearchSink.open(ElasticsearchSink.java:162) at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38) at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91) at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:376) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:256) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:585) at java.lang.Thread.run(Thread.java:745)/ This is the code I use to configure the sink (similar to the taxi ride example in https://www.elastic.co/blog/building-real-time-dashboard-applications-with-apache-flink-elasticsearch-and-kibana) /private void elasticSink() { Map config = new HashMap<>(); // This instructs the sink to emit after every element, otherwise they would be buffered config.put("bulk.flush.max.actions", "10"); config.put("cluster.name", "elasticdemo"); List transports = new ArrayList<>(); try { transports.add(new InetSocketAddress(InetAddress.getByName("localhost"), 9200)); } catch (UnknownHostException ex) { Logger.getLogger(CEPEngine.class.getName()).log(Level.SEVERE, null, ex); } stream.addSink(new ElasticsearchSink<>( config, transports, new AccessDataInsert())); }/ What could be the problem? Regards, Pedro Chaves -- View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Elasticsearch-sink-Java-lang-NoSuchMethodError-org-elasticsearch-common-settings-Settings-settingsBur-tp9773.html Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.
A custom FileInputFormat
Hello Flink community, I am running into an issue with a custom FileInputFormat class and would appreciate your help. My goal is to read all files from a directory as paths: val env : ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment var source : DataSet[String] = env.readFile(new DirReader, "/tmp/mydir").setParallelism(1) source.writeAsText("/tmp/results", WriteMode.OVERWRITE) env.execute("Job") It works, when I execute the program from within my IDE or execute it directly as a fat jar. When I run it through the Flink CLI the file "/tmp/results" is created, but not filled with entries. There seems to be something wrong with my custom DirReader (see below). The output of the println statements is not visible when running the code from the Flink CLI. No exception is stated in the logs (see below). I am at a loss at what to try. Even worse, when I copy the fat jar to a remote system, the problem appears also when I execute the fat jar directly. Local System Flink: 1.0.2 Java: 1.8.0_102 Scala: 2.11.8 Remote System Flink: 1.1.3 Java: 1.8.0_92 Scala: 2.11.6 Help or ideas to try out are welcome! Best, Niklas import java.io.File import org.apache.flink.api.common.io.FileInputFormat class DirReader extends FileInputFormat[String] { var running : Boolean = false var fileList : Array[String] = null override def openInputFormat() = { println("Path: " + this.filePath.toString) val directory = new File(this.filePath.toString) if (directory != null && directory.isDirectory) { fileList = directory.listFiles.filter(_.isDirectory).map(_.listFiles).flatten .map(_.toString) running = if (fileList.length > 1) true else false } println("fileList " + fileList.length + " running " + running) } override def nextRecord(reuse: String): String = { val head = fileList.head println("File: " + head) fileList = fileList.tail running = if (fileList.length == 0) false else true head } override def reachedEnd(): Boolean = ! running } The output from the CLI: 10/28/2016 18:27:56 Job execution switched to status RUNNING. 10/28/2016 18:27:56 DataSource (at org.apache.flink.api.scala.ExecutionEnvironment.readFile(ExecutionEnvironment.scala:385) (de.tuberlin.inet.plag.DirReader))(1/1) switched to SCHEDULED 10/28/2016 18:27:56 DataSource (at org.apache.flink.api.scala.ExecutionEnvironment.readFile(ExecutionEnvironment.scala:385) (de.tuberlin.inet.plag.DirReader))(1/1) switched to DEPLOYING 10/28/2016 18:27:56 DataSource (at org.apache.flink.api.scala.ExecutionEnvironment.readFile(ExecutionEnvironment.scala:385) (de.tuberlin.inet.plag.DirReader))(1/1) switched to RUNNING 10/28/2016 18:27:56 DataSink (TextOutputFormat (/tmp/results) - UTF-8)(1/1) switched to SCHEDULED 10/28/2016 18:27:56 DataSink (TextOutputFormat (/tmp/results) - UTF-8)(1/1) switched to DEPLOYING 10/28/2016 18:27:56 DataSource (at org.apache.flink.api.scala.ExecutionEnvironment.readFile(ExecutionEnvironment.scala:385) (de.tuberlin.inet.plag.DirReader))(1/1) switched to FINISHED 10/28/2016 18:27:56 DataSink (TextOutputFormat (/tmp/results) - UTF-8)(1/1) switched to RUNNING 10/28/2016 18:27:56 DataSink (TextOutputFormat (/tmp/results) - UTF-8)(1/1) switched to FINISHED 10/28/2016 18:27:56 Job execution switched to status FINISHED. -- Niklas Semmler PhD Student / Research Assistant TU Berlin, INET, Room MAR 4.027 Marchstr 23, 10587 Berlin Tel.: +49 (0)30 314 75739 http://inet.tu-berlin.de/~nsemmler/
Re: Yahoo! Streaming Benchmark with Flink
Hi Eric, concerning your first question. I think that AdvertisingTopologyFlinkStateHighKeyCard models a different scenario where one tries to count the number ads per campaign for a large number of campaigns. In this scenario, the input data already contains the campaign id for each ad. I think this is the job for the paragraph "Winning Twitter Hack Week: Eliminating the key-value store bottleneck". concerning your second question. The response actor is registered at the registration service. The registration service exposes the akka URL of this actor under the index of the running task. When you run AkkaStateQuery, the registration is queried to retrieve the akka URL and then a query state request is sent to the response actor via the QueryActor. That is how the actor comes into play. At the moment the registration service is implemented using ZooKeeper. This means that the akka URL is written to ZooKeeper from where it can be retrieved. I hope this answers your questions. Cheers, Till On Fri, Oct 28, 2016 at 2:47 AM, Eric Fukuda wrote: > Hi, > > I have two questions on the blog post on Yahoo! Streaming Benchmark with > Flink [1]. > > First is about the join operation to associate ad_ids and campaign_ids. In > flink.benchmark.state.AdvertisingTopologyFlinkStateHighKeyCard, I don't > see this being done. Is there a reason for this? > > Second is about Akka actor. Reading > flink.benchmark.state.QueryableWindowOperator > or flink.benchmark.state.QueryableWindowOperatorEvicting, it looks like > the Akka actor is being prepared but not used in the actual processing > (processElement()). Is this correct? And how do I enable Akka in the job? > > [1] http://data-artisans.com/extending-the-yahoo-streaming-benchmark/ > > Thanks, > Eric >
Re: Flushing the result of a groupReduce to a Sink before all reduces complete
Hi Fabian, We have reworked our execution to remove the group reduce step and replaced it with a map partition and we're seeing data passing more immediately now. Thanks for your quick reply, it was very useful. Regards, Paul On 26 October 2016 at 19:57, Fabian Hueske wrote: > Hi Paul, > > Flink pushes the results of operators (including GroupReduce) to the next > operator or sink as soon as they are computed. So what you are asking for > is actually happening. > However, before the GroupReduceFunction can be applied, the whole data is > sorted in order to group the data. This step is usually more expensive than > applying the GroupReduceFunction. Therefore, it looks like the output is > batched. > Flink does only support sort-based grouping, however also hash-based > grouping would not help, because Flink would not know when to close a group > until all data is consumed. > > Please let me know if you have further questions. > > Best, Fabian > > > 2016-10-26 19:07 GMT+02:00 Paul Wilson : > >> Hi, >> >> DataSet API >> Flink 1.1.3 >> >> I have an application where I'd like to perform some mapping before >> batching the results and passing them to the sink. I'm performing a >> 'composite' key selection to group the items by their natural key as well >> as a batch (itemCount / batchSize). When I reduce the batches and pass them >> to the sink, the whole flow is waiting for all reduces to complete before >> passing them to sink. >> >> Is there some way that the results of a single group reduce can be passed >> to the sink before all reduces are complete? >> >> Hope that makes sense, >> Regards, >> Paul >> > >
Re: Can we do batch writes on cassandra using flink while leveraging the locality?
Spark Cassandra connector does it! but I don't think it really implements a custom partitioner I think it just leverages token aware policy and does batch writes by default within a partition but you can also do across partitions with the same replica! On Thu, Oct 27, 2016 at 8:41 AM, Shannon Carey wrote: > It certainly seems possible to write a Partitioner that does what you > describe. I started implementing one but didn't have time to finish it. I > think the main difficulty is in properly dealing with partition ownership > changes in Cassandra… if you are maintaining state in Flink and the > partitioning changes, your job might produce inaccurate output. If, on the > other hand, you are only using the partitioner just before the output, > dynamic partitioning changes might be ok. > > > From: kant kodali > Date: Thursday, October 27, 2016 at 3:17 AM > To: > Subject: Can we do batch writes on cassandra using flink while leveraging > the locality? > > locality? For example the batch writes in Cassandra will put pressure on > the coordinator but since the connectors are built by leveraging the > locality I was wondering if we could do batch of writes on a node where the > batch belongs? >