AW: Blob server not working with 1.4.0.RC2
Hi Nico I think there were changes in the default port fort the BLOB server. I missed the fact that the Kubernetes configuration was still exposing 6124 for the JobManager BLOB server. Thanks Bernd -Ursprüngliche Nachricht- Von: Nico Kruber [mailto:n...@data-artisans.com] Gesendet: Montag, 4. Dezember 2017 14:17 An: Winterstein, Bernd; user@flink.apache.org Betreff: Re: Blob server not working with 1.4.0.RC2 Hi Bernd, thanks for the report. I tried to reproduce it locally but both a telnet connection to the BlobServer as well as the BLOB download by the TaskManagers work for me. Can you share your configuration that is causing the problem? You could also try increasing the log level to DEBUG and see if there is something more in the logs (the exception thrown in StaticFileServerHandler looks suspicious but is not related to the BlobServer). Apparently, the TaskManager resolves flink-jobmanager to 10.104.5.130. Is that the correct address and can the TaskManager talk to this IP? (may a firewall block this?) Did you, by any chance, set up SSL, too? There was a recent thread on the mailing list [1] where a had some problems with "security.ssl.verify-hostname" being set to true which may be related. Nico [1] https://lists.apache.org/thread.html/879d072bfd6761947b4bd703324489db50e8b14c328992118af875d8@%3Cuser.flink.apache.org%3E On 04/12/17 10:03, bernd.winterst...@dev.helaba.de wrote: > Hi > Since we switched to Release 1.4 the taskmanagers are unable to > download blobs from the jobmanager. > The taskmanager registration still works. > Netstat on jobmanager shows open ports at 6123 and 5. But a telnet > connection from taskmanager to jobmanager on port 5 times out. > > Any ideas are welcome. > > Regards > > Bernd > > Jobmanager log: > > 2017-12-04 08:48:30,167 INFO > org.apache.flink.runtime.jobmanager.JobManager- > Starting JobManager actor > 2017-12-04 08:48:30,197 INFO > org.apache.flink.runtime.blob.BlobServer - > Created BLOB server storage directory > /tmp/blobStore-81cd12c7-394e-4777-85a1-98389b72dd08 > 2017-12-04 08:48:30,205 INFO > org.apache.flink.runtime.blob.BlobServer - > Started BLOB server at 0.0.0.0:5 - max concurrent requests: 50 - > max > backlog: 1000 > 2017-12-04 08:48:30,608 INFO > org.apache.flink.runtime.jobmanager.JobManager- > Starting JobManager at akka.tcp://flink@flink-jobmanager:6123/user/jobmanager. > 2017-12-04 08:48:30,628 INFO > org.apache.flink.runtime.jobmanager.MemoryArchivist - > Started memory archivist akka://flink/user/archive > 2017-12-04 08:48:30,676 INFO > org.apache.flink.runtime.clusterframework.standalone.StandaloneResourc > eManager > - Trying to associate with JobManager leader > akka.tcp://flink@flink-jobmanager:6123/user/jobmanager > 2017-12-04 08:48:30,692 INFO > org.apache.flink.runtime.jobmanager.JobManager- > JobManager akka.tcp://flink@flink-jobmanager:6123/user/jobmanager was > granted leadership with leader session ID > Some(----). > 2017-12-04 08:48:30,700 INFO > org.apache.flink.runtime.clusterframework.standalone.StandaloneResourc > eManager > - Resource Manager associating with leading JobManager > Actor[akka://flink/user/jobmanager#886586058] - leader session > ---- > 2017-12-04 08:53:50,635 INFO > org.apache.flink.runtime.clusterframework.standalone.StandaloneResourc > eManager > - TaskManager 627338086a766c140909ba45f2e717d0 has started. > 2017-12-04 08:53:50,638 INFO > org.apache.flink.runtime.instance.InstanceManager - > Registered TaskManager at flink-taskmanager-65cf757d9b-hj65d > (akka.tcp://flink@flink-taskmanager-65cf757d9b-hj65d:45932/user/taskma > nager) as f9d2843d0223b15d8fce52aea8231cc6. Current number of > registered hosts is 1. Current number of alive task slots is 8. > 2017-12-04 08:53:50,658 WARN > akka.serialization.Serialization(akka://flink)- Using > the default Java serializer for class > [org.apache.flink.runtime.messages.JobManagerMessages$LeaderSessionMes > sage] which is not recommended because of performance implications. > Use another serializer or disable this warning using the setting > 'akka.actor.warn-about-java-serializer-usage' > 2017-12-04 08:53:55,714 INFO > org.apache.flink.runtime.clusterframework.standalone.StandaloneResourc > eManager > - TaskManager 08c3e6f7c765e2ab88e2ea645049cb9d has started. > 2017-12-04 08:53:55,714 INFO > org.apache.flink.runtime.instance.InstanceManager - > Registered TaskManager at flink-taskmanager-65cf757d9b-jtzw5 > (akka.tcp://flink@flink-taskmanager-65cf757d9b-jtzw5:41710/user/taskma > nager) as da8a8da3650ce53f460784c54938a071. Current number of > registered hosts is 2. Current number of alive task slots is 16. > 2017-12-04 09:04:08,850 ERROR > org.apache.flink.runtime.rest.handler.legacy.files.StaticFileServerHan > dler > - Caught exception
Re: aggregate does not allow RichAggregateFunction ?
It seems that this has to do with session windows tbat are mergeable ? I tried the RixhWindow function and that seems to suggest that one cannot use state ? Any ideas folks... On Dec 1, 2017 10:38 AM, "Vishal Santoshi"wrote: > I have a simple Aggregation with one caveat. For some reason I have to > keep a large amount of state till the window is GCed. The state is within > the Accumulator ( ACC ). I am hitting a memory bottleneck and would like to > offload the state to the states backend ( ROCKSDB), keeping the between > checkpoint state in memory ( seems to be an obvious fix). I am not though > allowed to have a RichAggregateFunction in the aggregate method of a > windowed stream . That begs 2 questions > > 1. Why > 2. Is there an alternative for stateful window aggregation where we manage > the state. ? > > Thanks Vishal > > > Here is the code ( generics but it works ) > > SingleOutputStreamOperator retVal = input > .keyBy(keySelector) > .window(EventTimeSessionWindows.withGap(gap)) > .aggregate( > new AggregateFunction () { > > @Override > public ACC createAccumulator() { > ACC newInstance = (ACC) accumulator.clone(); > newInstance.resetLocal(); > return newInstance; > } > > @Override > public void add(IN value, ACC accumulator) { > accumulator.add(value); > > } > > @Override > public OUT getResult(ACC accumulator) { > return accumulator.getLocalValue(); > } > > @Override > public ACC merge(ACC a, ACC b) { > a.merge(b); > return a; > } > }, new WindowFunction () { > @Override > public void apply(KEY s, TimeWindow window, Iterable > input, Collector out) throws Exception { > out.collect(input.iterator().next()); > } > }, accType, aggregationResultType, aggregationResultType); > >
Re: How to perform efficient DataSet reuse between iterations
Hello Fabian, Thanks for the help. I am interested in the duration of specific operators, so the fact that parts of the execution are in pipeline is not a problem for me. >From my understanding, the automated way to approach this is to run the Flink job with the web interface active and then make a REST call on the appropriate job and parse the JSON. I have implemented that and my program is able to read the duration of specific operators. However, I'm facing another problem whose cause I haven't been able to pinpoint. Testing on 1.4.0-SNAPSHOT. When launching a cluster (start-local.bat on Windows or start-cluster.sh on Linux), the JobManager, the TaskManager(s) are launched and the web front end becomes active (I can access it via browser) - everything is ok so far. The problem occurs when the number of operators in the plan increases. Consider that I execute my algorithm three (3) times through a single Flink plan. For three times, vertices and edges will be added to the graph (via Gelly methods). This is defined in a for loop in my program. For each iteration: // I add 100 edges to the graph, decomposed in a list of vertices and edges final GraphnewGraph = graph .addVertices(verticesToAdd) .addEdges(edgesToAdd); // Generate identifications for the vertex counter. final String vid = new AbstractID().toString(); newGraph.getVertices().output(new Utils.CountHelper >(vid)).name("count()"); vids.put(executionCounter, vid); // Generate identifications for the edge counter. final String eid = new AbstractID().toString(); newGraph.getEdges().output(new Utils.CountHelper >(eid)).name("count()"); eids.put(executionCounter, eid); So far I have created 2 sinks in the current iteration in my regular Java program. Then I execute my graph algorithm with the previous results: // Execute the graph algorithm and output some of its results result = newGraph.run(new MyGraphAlgorithm<>(previousResults)); result.sortPartition(1, Order.DESCENDING).setParallelism(1).first(1000).output(outputFormat).name("Store results"); previousResults = result; This creates one additional sink via the output function. So for three (3) iterations, I define nine (9) sinks in the plan, call execute() and afterward retrieve the contents of the sinks. This works fine so far. If I run with 10 iterations, I will be creating 30 sinks. The problem is that for 10 iterations, the Flink client program just hangs on the execute() call forever (execution time should increase linearly with the number of iterations). For 3 iterations, execute() proceeds normally and it takes around 20 seconds per iteration, so 3 iterations is 60 seconds and 10 should be around 3 minutes. After five hours, there was no progress. Furthermore, I checked the web monitor periodically and there was not a single job. It seems that the client program is simply not sending the job to the cluster if the job plan becomes too big. The exact same compiled program, with 3 iterations (via argument) works, but with 10 (via argument) it simply falls silent. I am trying to understand what may be the problem: - An internal limit in the number of datasinks or operators in the plan? - A limit in message size preventing the client from sending the job? (see: https://issues.apache.org/jira/browse/FLINK-2603 ) I have tried increasing the akka.framesize to 256000kB in the Flink server flink-conf.yaml config and in the client program when creating the remote environment with: Configuration clientConfig = new Configuration(); final String akkaFrameSize = "256000kB"; ConfigOption akkaConfig = ConfigOptions.key(AkkaOptions.FRAMESIZE.key()).defaultValue(akkaFrameSize); clientConfig.setString(akkaConfig, akkaFrameSize); env = new RemoteEnvironment(remoteAddress, remotePort, clientConfig, jarFiles, null); I have run out of ideas with respect to the causes. Hoping you may be able to shed some light. Thank you for your time, Best regards, Miguel E. Coimbra Email: miguel.e.coim...@gmail.com Skype: miguel.e.coimbra On 29 November 2017 at 10:23, Fabian Hueske wrote: > The monitoring REST interface provides detailed stats about a job, its > tasks, and processing verticies including their start and end time [1]. > > However, it is not trivial to make sense of the execution times because > Flink uses pipelined shuffles by default. > That means that the execution of multiple operators can overlap. For > example the records that are produced by a GroupReduce can be processed by > a Map, shuffled, and sorted (for another GroupReduce) in a pipelined > fashion. > Hence, all these operations run at the same time. You can disable this > behavior to some extend by setting the execution mode to batched shuffles > [2]. > However, this will likely have a negative impact on the overall execution > time. > > Best, Fabian > > [1]
Re: CPU Cores of JobManager
Hi Timo I execute streaming job without checkpointing and I don't configure any state backend, so it may be "MemoryStateBackend". Actually, my streaming app just reads data from kafka and writes it to an external DB. Its not so complicated. Regards, Yuta On 2017/12/05 19:55, Timo Walther wrote: I had some profiling tool like jvisualvm in mind. Are you executing streaming or batch jobs? If streaming, is checkpointing enabled and which type of statebackend? @Chesnay do you have experience with slow behavior of the Web UI? Regards, Timo Am 12/5/17 um 10:37 AM schrieb Yuta Morisawa: Hi Timo Thank you for your early reply. These are commands which I run my apps. ./bin/yarn-session.sh -n 20 -jm 6000 -tm 24000 -s 10 ./bin/flink run -p 100 ./bin/flink run -p 100 So, JobManager Heap Memory = 6000 MB and it manages 2 jobs. > Maybe you can use a profiler and find out which component consumes so > much CPU resources? You mean Java Flight Recorder or JITWatch? Or, Flink has original profiler? https://ci.apache.org/projects/flink/flink-docs-release-1.3/monitoring/application_profiling.html Regards, Yuta On 2017/12/05 18:02, Timo Walther wrote: Hi Yuta, as far as I know you cannot assign more cores to a JobManager. Can you tell us a bit more about your environment? How many jobs does the JobManager has to manage? How much heap memory is assigned to the JobManager? Maybe you can use a profiler and find out which component consumes so much CPU resources? Regards, Timo Am 12/5/17 um 5:13 AM schrieb Yuta Morisawa: Hi Now I am looking for the way to increase the number of allocated CPU cores because my JobManagaer WEBUI is very heavy and sometimes freeze. I think this is caused by the resource shortage of JobManager. How can I increase the number of CPU for JobManager in YARN mode? Thanks Yuta
Re: Flink Batch Performance degradation at scale
Hi, Flink's operators are designed to work in memory as long as possible and spill to disk once the memory budget is exceeded. Moreover, Flink aims to run programs in a pipelined fashion, such that multiple operators can process data at the same time. This behavior can make it a bit tricky to analyze the runtime behavior and progress of operators. It would be interesting to have a look at the execution plan for the program that you are running. The plan can be obtained from the ExecutionEnvironment by calling env.getExecutionPlan() instead of env.execute(). I would also like to know how you track the progress of the program. Are you looking at the record counts displayed in the WebUI? Best, Fabian 2017-12-05 22:03 GMT+01:00 Garrett Barton: > I have been moving some old MR and hive workflows into Flink because I'm > enjoying the api's and the ease of development is wonderful. Things have > largely worked great until I tried to really scale some of the jobs > recently. > > I have for example one etl job that reads in about 12B records at a time > and does a sort, some simple transformations, validation, a re-partition > and then output to a hive table. > When I built it with the sample set, ~200M, it worked great, took maybe a > minute and blew threw it. > > What I have observed is there is some kind of saturation reached depending > on number of slots, number of nodes and the overall size of data to move. > When I run the 12B set, the first 1B go through in under 1 minute, really > really fast. But its an extremely sharp drop off after that, the next 1B > might take 15 minutes, and then if I wait for the next 1B, its well over an > hour. > > What I cant find is any obvious indicators or things to look at, > everything just grinds to a halt, I don't think the job would ever actually > complete. > > Is there something in the design of flink in batch mode that is perhaps > memory bound? Adding more nodes/tasks does not fix it, just gets me a > little further along. I'm already running around ~1,400 slots at this > point, I'd postulate needing 10,000+ to potentially make the job run, but > thats too much of my cluster gone, and I have yet to get flink to be stable > past 1,500. > > Any idea's on where to look, or what to debug? GUI is also very > cumbersome to use at this slot count too, so other measurement ideas are > welcome too! > > Thank you all. >
Flink Batch Performance degradation at scale
I have been moving some old MR and hive workflows into Flink because I'm enjoying the api's and the ease of development is wonderful. Things have largely worked great until I tried to really scale some of the jobs recently. I have for example one etl job that reads in about 12B records at a time and does a sort, some simple transformations, validation, a re-partition and then output to a hive table. When I built it with the sample set, ~200M, it worked great, took maybe a minute and blew threw it. What I have observed is there is some kind of saturation reached depending on number of slots, number of nodes and the overall size of data to move. When I run the 12B set, the first 1B go through in under 1 minute, really really fast. But its an extremely sharp drop off after that, the next 1B might take 15 minutes, and then if I wait for the next 1B, its well over an hour. What I cant find is any obvious indicators or things to look at, everything just grinds to a halt, I don't think the job would ever actually complete. Is there something in the design of flink in batch mode that is perhaps memory bound? Adding more nodes/tasks does not fix it, just gets me a little further along. I'm already running around ~1,400 slots at this point, I'd postulate needing 10,000+ to potentially make the job run, but thats too much of my cluster gone, and I have yet to get flink to be stable past 1,500. Any idea's on where to look, or what to debug? GUI is also very cumbersome to use at this slot count too, so other measurement ideas are welcome too! Thank you all.
Re: S3 Write Execption
Hi Stephan, I am facing S3 consistency related issue with the exception pasted at the end: We were able to solve the s3 sync issue by adding System.currentTime to inprogressPrefix, inprogressSuffix, s3PendingPrefix and s3PendingSuffix properties of BucketingSink. I tried another approach by updating the BucketingSink code wherein I have appended the partPath variable with System.currentTime (in openNewPartFile method). Can you please let me know if this is the correct approach in order to get rid of this exception. TimerException{java.io.IOException: Unable to create file due to concurrent write, file corrupted potentially: s3:///part-0-0inprogress} at org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:220) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Caused by: java.io.IOException: Unable to create file due to concurrent write, file corrupted potentially: s3:///part-0-0inprogress at com.amazon.ws.emr.hadoop.fs.consistency.ConsistencyCheckerS3FileSystem$1.execute(ConsistencyCheckerS3FileSystem.java:245) at com.amazon.ws.emr.hadoop.fs.consistency.ConsistencyCheckerS3FileSystem$1.execute(ConsistencyCheckerS3FileSystem.java:201) at com.amazon.ws.emr.hadoop.fs.s3.S3FSOutputStream.close(S3FSOutputStream.java:188) at org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:74) at org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:108) at org.apache.flink.streaming.connectors.fs.StreamWriterBase.close(StreamWriterBase.java:147) at org.apache.flink.streaming.connectors.fs.SequenceFileWriter.close(SequenceFileWriter.java:116) at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.closeCurrentPartFile(BucketingSink.java:554) at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.checkForInactiveBuckets(BucketingSink.java:496) at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.onProcessingTime(BucketingSink.java:479) Regards, Vinay Patil -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Re: S3 Write Execption
Hi Stephan, I am facing S3 consistency related issue with the exception pasted at the end: We were able to solve the s3 sync issue by adding System.currentTime to inprogressPrefix, inprogressSuffix, s3PendingPrefix and s3PendingSuffix properties of BucketingSink. I tried another approach by updating the BucketingSink code wherein I have appended the partPath variable with System.currentTime (in openNewPartFile method). Can you please let me know if this is the correct approach in order to get rid of this exception. TimerException{java.io.IOException: Unable to create file due to concurrent write, file corrupted potentially: s3:///part-0-0inprogress} at org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:220) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Caused by: java.io.IOException: Unable to create file due to concurrent write, file corrupted potentially: s3:///part-0-0inprogress at com.amazon.ws.emr.hadoop.fs.consistency.ConsistencyCheckerS3FileSystem$1.execute(ConsistencyCheckerS3FileSystem.java:245) at com.amazon.ws.emr.hadoop.fs.consistency.ConsistencyCheckerS3FileSystem$1.execute(ConsistencyCheckerS3FileSystem.java:201) at com.amazon.ws.emr.hadoop.fs.s3.S3FSOutputStream.close(S3FSOutputStream.java:188) at org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:74) at org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:108) at org.apache.flink.streaming.connectors.fs.StreamWriterBase.close(StreamWriterBase.java:147) at org.apache.flink.streaming.connectors.fs.SequenceFileWriter.close(SequenceFileWriter.java:116) at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.closeCurrentPartFile(BucketingSink.java:554) at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.checkForInactiveBuckets(BucketingSink.java:496) at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.onProcessingTime(BucketingSink.java:479) Regards, Vinay Patil -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Re: slot group indication per operator
Hi Tovi, you are right, it is difficult to check the correct behavior. @Chesnay: Do you know if we can get this information? If not through the Web UI, maybe via REST? Do we have access to the full ExecutionGraph somewhere? Otherwise it might make sense to open an issue for this. Regards, Timo Am 12/5/17 um 4:25 PM schrieb Sofer, Tovi : Hi all, I am trying to use the slot group feature, by having ‘default’ group and additional ‘market’ group. The purpose is to divide the resources equally between two sources and their following operators. I’ve set the slotGroup on the source of the market data. Can I assume that all following operators created from this source will use same slot group of ‘market’? (The operators created for market stream are pretty complex, with connect and split). In Web UI I saw there are 16 slots, but didn’t see indication per operator to which group it was assigned. How can I know? Relevant Code: env.setParallelism(8); conf.setInteger(ConfigConstants.*/TASK_MANAGER_NUM_TASK_SLOTS/*, 16); \\ to allow Parallelism of 8 per group // Market source and operators: KeyedStreamwindowedStreamA = *sourceProvider*.provide(env) .name(*spotSourceProvider*.getName()) .slotSharingGroup(SourceMsgType.*/MARKET/*.slotGroup()) .flatMap(*new *ParserMapper(*new *MarketMessageParser())) .name(ParserMapper.*class*.getSimpleName()) .filter(*new *USDFilter()) .name(USDFilter.*class*.getSimpleName()) .keyBy(MarketEvent.*/CURRENCY_FIELD/*) .timeWindow(Time./of/(windowSizeMs, TimeUnit.*/MILLISECONDS/*)) .process(*new *LastInWindowPriceChangeFunction())) .name(LastInWindowPriceChangeFunction.*class*.getSimpleName()) .keyBy(SpotTickEvent.*/CURRENCY_FIELD/*); marketConnectedStream = windowedStreamA.connect(windowedStreamB) .flatMap(*new *MarketCoMapper())) .name(MarketCoMapper.*class*.getSimpleName()) SplitStream stocksWithSpotsStreams = marketConnectedStream .split( market -> ImmutableList./of/(*"splitA"*,*" splitB"*) ); DataStream< MarketAWithMarketB> splitA = stocksWithSpotsStreams.select(*"splitA "*); Thanks and regards, Tovi
Re: Share state across operators
Thanks, Timo. Eitherworks for me. On Tue, Dec 5, 2017 at 4:55 PM, Timo Walther wrote: > Hi Shailesh, > > sharing state across operators is not possible. However, you could emit > the state (or parts of it) as a stream element to downstream operators by > having a function that emits a type like "Either >". > > Another option would be to use side outputs to send state to downstream > operators [0]. > > Maybe you can tell use a bit more about what you want to achieve? > > Regards, > Timo > > [0] https://ci.apache.org/projects/flink/flink-docs-release-1.3/ > dev/stream/side_output.html > > > Am 12/5/17 um 11:58 AM schrieb Shailesh Jain: > > Hi, >> >> Is it possible to share state across operators in Flink? >> >> I have CoFlatMap operator which maintains a ListState and returns a >> DataStream. And downstream there is a KafkaSink operator for the same >> DataStream which needs to access the ListState. >> >> Thanks, >> Shailesh >> > > >
slot group indication per operator
Hi all, I am trying to use the slot group feature, by having 'default' group and additional 'market' group. The purpose is to divide the resources equally between two sources and their following operators. I've set the slotGroup on the source of the market data. Can I assume that all following operators created from this source will use same slot group of 'market'? (The operators created for market stream are pretty complex, with connect and split). In Web UI I saw there are 16 slots, but didn't see indication per operator to which group it was assigned. How can I know? Relevant Code: env.setParallelism(8); conf.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 16); \\ to allow Parallelism of 8 per group // Market source and operators: KeyedStreamwindowedStreamA = sourceProvider.provide(env) .name(spotSourceProvider.getName()) .slotSharingGroup(SourceMsgType.MARKET.slotGroup()) .flatMap(new ParserMapper(new MarketMessageParser())) .name(ParserMapper.class.getSimpleName()) .filter(new USDFilter()) .name(USDFilter.class.getSimpleName()) .keyBy(MarketEvent.CURRENCY_FIELD) .timeWindow(Time.of(windowSizeMs, TimeUnit.MILLISECONDS)) .process(new LastInWindowPriceChangeFunction())) .name(LastInWindowPriceChangeFunction.class.getSimpleName()) .keyBy(SpotTickEvent.CURRENCY_FIELD); marketConnectedStream = windowedStreamA.connect(windowedStreamB) .flatMap(new MarketCoMapper())) .name(MarketCoMapper.class.getSimpleName()) SplitStream stocksWithSpotsStreams = marketConnectedStream .split( market -> ImmutableList.of("splitA"," splitB") ); DataStream< MarketAWithMarketB> splitA = stocksWithSpotsStreams.select("splitA "); Thanks and regards, Tovi
Re: Share state across operators
Hi Shailesh, sharing state across operators is not possible. However, you could emit the state (or parts of it) as a stream element to downstream operators by having a function that emits a type like "Either". Another option would be to use side outputs to send state to downstream operators [0]. Maybe you can tell use a bit more about what you want to achieve? Regards, Timo [0] https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/stream/side_output.html Am 12/5/17 um 11:58 AM schrieb Shailesh Jain: Hi, Is it possible to share state across operators in Flink? I have CoFlatMap operator which maintains a ListState and returns a DataStream. And downstream there is a KafkaSink operator for the same DataStream which needs to access the ListState. Thanks, Shailesh
Re: Flink with Ceph as the persistent storage
It would be the same as with any other form of async checkpointing. No direct blocking of processing but the network traffic might indirectly affect it to some extent :) Jayant Ametaezt írta (időpont: 2017. dec. 5., K, 12:15): > If the checkpointing to Ceph happens asynchronously, does it still have > any impact on the stream processing? > > Jayant Ameta > > On Tue, Dec 5, 2017 at 4:34 PM, Gyula Fóra wrote: > >> Hi, >> >> To my understanding Ceph as in http://ceph.com/ceph-storage/ is a block >> based object storage system. You can use it mounted to your server and will >> behave as a local file system to most extent but will be shared in the >> cluster. >> >> The performance might not be as good as with HDFS to our experience. >> >> Gyula >> >> Jayant Ameta ezt írta (időpont: 2017. dec. 5., K, >> 12:00): >> >>> Hi, >>> Flink documents suggests that Ceph can be used as a persistent storage >>> for states. >>> https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/stream/checkpointing.html >>> >>> Considering that Ceph is a transactional database, wouldn't it have >>> adverse effect on Flink's performance? >>> >>> >>> >
Re: Flink with Ceph as the persistent storage
If the checkpointing to Ceph happens asynchronously, does it still have any impact on the stream processing? Jayant Ameta On Tue, Dec 5, 2017 at 4:34 PM, Gyula Fórawrote: > Hi, > > To my understanding Ceph as in http://ceph.com/ceph-storage/ is a block > based object storage system. You can use it mounted to your server and will > behave as a local file system to most extent but will be shared in the > cluster. > > The performance might not be as good as with HDFS to our experience. > > Gyula > > Jayant Ameta ezt írta (időpont: 2017. dec. 5., K, > 12:00): > >> Hi, >> Flink documents suggests that Ceph can be used as a persistent storage >> for states. https://ci.apache.org/projects/flink/flink-docs- >> release-1.3/dev/stream/checkpointing.html >> >> Considering that Ceph is a transactional database, wouldn't it have >> adverse effect on Flink's performance? >> >> >>
Re: Share state across operators
Missed one point - I'm using Managed Operator state (and not Keyed state - as my data streams are not keyed). On Tue, Dec 5, 2017 at 4:28 PM, Shailesh Jainwrote: > Hi, > > Is it possible to share state across operators in Flink? > > I have CoFlatMap operator which maintains a ListState and returns a > DataStream. And downstream there is a KafkaSink operator for the same > DataStream which needs to access the ListState. > > Thanks, > Shailesh >
Re: Flink with Ceph as the persistent storage
Hi, To my understanding Ceph as in http://ceph.com/ceph-storage/ is a block based object storage system. You can use it mounted to your server and will behave as a local file system to most extent but will be shared in the cluster. The performance might not be as good as with HDFS to our experience. Gyula Jayant Ametaezt írta (időpont: 2017. dec. 5., K, 12:00): > Hi, > Flink documents suggests that Ceph can be used as a persistent storage for > states. > https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/stream/checkpointing.html > > Considering that Ceph is a transactional database, wouldn't it have > adverse effect on Flink's performance? > > >
Flink with Ceph as the persistent storage
Hi, Flink documents suggests that Ceph can be used as a persistent storage for states. https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/stream/checkpointing.html Considering that Ceph is a transactional database, wouldn't it have adverse effect on Flink's performance?
Share state across operators
Hi, Is it possible to share state across operators in Flink? I have CoFlatMap operator which maintains a ListState and returns a DataStream. And downstream there is a KafkaSink operator for the same DataStream which needs to access the ListState. Thanks, Shailesh
Re: CPU Cores of JobManager
I had some profiling tool like jvisualvm in mind. Are you executing streaming or batch jobs? If streaming, is checkpointing enabled and which type of statebackend? @Chesnay do you have experience with slow behavior of the Web UI? Regards, Timo Am 12/5/17 um 10:37 AM schrieb Yuta Morisawa: Hi Timo Thank you for your early reply. These are commands which I run my apps. ./bin/yarn-session.sh -n 20 -jm 6000 -tm 24000 -s 10 ./bin/flink run -p 100 ./bin/flink run -p 100 So, JobManager Heap Memory = 6000 MB and it manages 2 jobs. > Maybe you can use a profiler and find out which component consumes so > much CPU resources? You mean Java Flight Recorder or JITWatch? Or, Flink has original profiler? https://ci.apache.org/projects/flink/flink-docs-release-1.3/monitoring/application_profiling.html Regards, Yuta On 2017/12/05 18:02, Timo Walther wrote: Hi Yuta, as far as I know you cannot assign more cores to a JobManager. Can you tell us a bit more about your environment? How many jobs does the JobManager has to manage? How much heap memory is assigned to the JobManager? Maybe you can use a profiler and find out which component consumes so much CPU resources? Regards, Timo Am 12/5/17 um 5:13 AM schrieb Yuta Morisawa: Hi Now I am looking for the way to increase the number of allocated CPU cores because my JobManagaer WEBUI is very heavy and sometimes freeze. I think this is caused by the resource shortage of JobManager. How can I increase the number of CPU for JobManager in YARN mode? Thanks Yuta
Re: Maintain heavy hitters in Flink application
Hi, I haven't done that before either. The query API will change with the next version (Flink 1.4.0) which is currently being prepared for releasing. Kostas (in CC) might be able to help you. Best, Fabian 2017-12-05 9:52 GMT+01:00 m@xi: > Hi Fabian, > > Thanks for your answer. Initially, I have excluded Queryable State as an > option as it explicitly mentioned that it is used for querying state > outside > flink. > > Now that I am reading the documentation I am not sure how I may achieve > that. I have to set ports and addresses which I am not sure I should since > I > am reading the queryable state from inside the same job. > > Can you or someone elaborate further how can I read the queryable state of > a > specific task from another task (e.g. map). > > Best, > Max > > > > -- > Sent from: http://apache-flink-user-mailing-list-archive.2336050. > n4.nabble.com/ >
Re: Window function support on SQL
Hi Tao, computing a group window requires that the event-time timestamp of the DataStream is exposed as a time attribute (in your case as an event time attribute). If you register DataStream at the TableEnvironment, this has to be done in two steps: 1) assign timestamps and watermarks to the DataStream [1]: val kinesisStream = env .fromCollection(testData) .assignTimestampsAndWatermarks(yourAssigner) // yourAssigner should extract the event_timestamp field of the Avro record and assign watermarks 2) declare the event timestamp of the DataStream as an attribute in the schema of the table [2]: tableEnv.registerDataStream(streamName, kinesisStream, 'nd_key, 'concept_rank, 'event_timestamp.rowtime); Once event_timestamp is declared as time attribute, it can be used in window functions. Best, Fabian [1] https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/event_timestamps_watermarks.html [2] https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/table/streaming.html#event-time 2017-12-05 1:40 GMT+01:00 Tao Xia: > Thanks for the quick response Fabian > > I have DataStream of avro objects. Not sure how to add a TIMESTAMP > attribute or convert the event_timestramp field to Timestamp Attribute for > my SQL use cases. Most docs only covers the Table API with static schema. > p.s. my Avro schema has 100+ fields. > Can you guide me how to prepare my query to aggregate by nd_key and > event_timestamp per hour? > > val testData = List( > > > UnifiedEvent.newBuilder().setNdKey("nd101").setConceptRank(10).setEventTimestamp(1512172415.longValue()).build(), > > UnifiedEvent.newBuilder().setNdKey("nd101").setConceptRank(15).setEventTimestamp(1512172415.longValue()).build(), > > UnifiedEvent.newBuilder().setNdKey("nd102").setConceptRank(20).setEventTimestamp(1512172415.longValue()).build(), > > UnifiedEvent.newBuilder().setNdKey("nd102").setConceptRank(25).setEventTimestamp(1512172415.longValue()).build() > ) > > val kinesisStream = env.fromCollection(testData) > > tableEnv.registerDataStream(streamName, avroStream); > > val query = "SELECT nd_key, sum(concept_rank) FROM "+streamName + " GROUP > BY nd_key" > > Thanks, > Tao > > On Mon, Dec 4, 2017 at 3:32 PM, Fabian Hueske wrote: > >> Hi, >> >> yes, Apache Calcite's group window functions are supported. >> >> The error message tells you that the attribute event_timestamp should be >> of type DATETIME (or TIMESTAMP) and not BIGINT. >> Please check the documentation for details [1]. >> >> Best, Fabian >> >> [1] https://ci.apache.org/projects/flink/flink-docs-release-1.3/ >> dev/table/sql.html#group-windows >> >> 2017-12-04 22:17 GMT+01:00 Tao Xia : >> >>> Hi All, >>> Do you know if window function supported on SQL yet? >>> I got the error message when trying to use group function in SQL. >>> >>> My query below: >>> >>> val query = "SELECT nd_key, concept_rank, event_timestamp FROM "+streamName >>> + " GROUP BY TUMBLE(event_timestamp, INTERVAL '1' HOUR), nd_key" >>> >>> >>> Error Message: >>> Exception in thread "main" org.apache.flink.table.api.ValidationException: >>> SQL validation failed. From line 1, column 74 to line 1, column 115: Cannot >>> apply 'TUMBLE' to arguments of type 'TUMBLE(, )'. >>> Supported form(s): 'TUMBLE(, )' >>> 'TUMBLE(, , )' >>> at org.apache.flink.table.calcite.FlinkPlannerImpl.validate(Fli >>> nkPlannerImpl.scala:93) >>> at org.apache.flink.table.api.TableEnvironment.sqlQuery(TableEn >>> vironment.scala:561) >>> at com.udacity.data.pipeline.AggregationJob$.main(AggregationJo >>> b.scala:43) >>> at com.udacity.data.pipeline.AggregationJob.main(AggregationJob.scala) >>> Caused by: org.apache.calcite.runtime.CalciteContextException: From >>> line 1, column 74 to line 1, column 115: Cannot apply 'TUMBLE' to arguments >>> of type 'TUMBLE(, )'. Supported form(s): >>> 'TUMBLE(, )' >>> 'TUMBLE(, , )' >>> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) >>> at sun.reflect.NativeConstructorAccessorImpl.newInstance(Native >>> ConstructorAccessorImpl.java:62) >>> at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(De >>> legatingConstructorAccessorImpl.java:45) >>> at java.lang.reflect.Constructor.newInstance(Constructor.java:423) >>> at org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Reso >>> urces.java:463) >>> at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:803) >>> at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:788) >>> at org.apache.calcite.sql.validate.SqlValidatorImpl.newValidati >>> onError(SqlValidatorImpl.java:4654) >>> at org.apache.calcite.sql.SqlCallBinding.newValidationSignature >>> Error(SqlCallBinding.java:284) >>> at org.apache.calcite.sql.type.FamilyOperandTypeChecker.checkSi >>> ngleOperandType(FamilyOperandTypeChecker.java:92) >>> at org.apache.calcite.sql.type.FamilyOperandTypeChecker.checkOp >>> erandTypes(FamilyOperandTypeChecker.java:109) >>> at
Re: CPU Cores of JobManager
Hi Yuta, as far as I know you cannot assign more cores to a JobManager. Can you tell us a bit more about your environment? How many jobs does the JobManager has to manage? How much heap memory is assigned to the JobManager? Maybe you can use a profiler and find out which component consumes so much CPU resources? Regards, Timo Am 12/5/17 um 5:13 AM schrieb Yuta Morisawa: Hi Now I am looking for the way to increase the number of allocated CPU cores because my JobManagaer WEBUI is very heavy and sometimes freeze. I think this is caused by the resource shortage of JobManager. How can I increase the number of CPU for JobManager in YARN mode? Thanks Yuta
Re: subscribe
To subscribe to this mailing list, please send a mail to user-subscr...@flink.apache.org . On 05.12.2017 08:21, Xin Wang wrote: hello flink -- Thanks, Xin