AW: Blob server not working with 1.4.0.RC2

2017-12-05 Thread Bernd.Winterstein
Hi Nico
I think there were changes in the default port fort the BLOB server. I missed 
the fact that the Kubernetes configuration was still exposing 6124 for the 
JobManager BLOB server.
Thanks

Bernd

-Ursprüngliche Nachricht-
Von: Nico Kruber [mailto:n...@data-artisans.com]
Gesendet: Montag, 4. Dezember 2017 14:17
An: Winterstein, Bernd; user@flink.apache.org
Betreff: Re: Blob server not working with 1.4.0.RC2

Hi Bernd,
thanks for the report. I tried to reproduce it locally but both a telnet 
connection to the BlobServer as well as the BLOB download by the TaskManagers 
work for me. Can you share your configuration that is causing the problem? You 
could also try increasing the log level to DEBUG and see if there is something 
more in the logs (the exception thrown in StaticFileServerHandler looks 
suspicious but is not related to the BlobServer).
Apparently, the TaskManager resolves flink-jobmanager to 10.104.5.130.
Is that the correct address and can the TaskManager talk to this IP?
(may a firewall block this?)

Did you, by any chance, set up SSL, too? There was a recent thread on the 
mailing list [1] where a had some problems with "security.ssl.verify-hostname" 
being set to true which may be related.


Nico

[1]
https://lists.apache.org/thread.html/879d072bfd6761947b4bd703324489db50e8b14c328992118af875d8@%3Cuser.flink.apache.org%3E

On 04/12/17 10:03, bernd.winterst...@dev.helaba.de wrote:
> Hi
> Since we switched to Release 1.4 the taskmanagers are unable to
> download blobs from the jobmanager.
> The taskmanager registration still works.
> Netstat on jobmanager shows open ports at 6123 and 5. But a telnet
> connection from taskmanager to jobmanager on port 5 times out.
>
> Any ideas are welcome.
>
> Regards
>
> Bernd
>
> Jobmanager log:
>
> 2017-12-04 08:48:30,167 INFO
> org.apache.flink.runtime.jobmanager.JobManager-
> Starting JobManager actor
> 2017-12-04 08:48:30,197 INFO
> org.apache.flink.runtime.blob.BlobServer  -
> Created BLOB server storage directory
> /tmp/blobStore-81cd12c7-394e-4777-85a1-98389b72dd08
> 2017-12-04 08:48:30,205 INFO
> org.apache.flink.runtime.blob.BlobServer  -
> Started BLOB server at 0.0.0.0:5 - max concurrent requests: 50 -
> max
> backlog: 1000
> 2017-12-04 08:48:30,608 INFO
> org.apache.flink.runtime.jobmanager.JobManager-
> Starting JobManager at akka.tcp://flink@flink-jobmanager:6123/user/jobmanager.
> 2017-12-04 08:48:30,628 INFO
> org.apache.flink.runtime.jobmanager.MemoryArchivist   -
> Started memory archivist akka://flink/user/archive
> 2017-12-04 08:48:30,676 INFO
> org.apache.flink.runtime.clusterframework.standalone.StandaloneResourc
> eManager
> - Trying to associate with JobManager leader
> akka.tcp://flink@flink-jobmanager:6123/user/jobmanager
> 2017-12-04 08:48:30,692 INFO
> org.apache.flink.runtime.jobmanager.JobManager-
> JobManager akka.tcp://flink@flink-jobmanager:6123/user/jobmanager was
> granted leadership with leader session ID
> Some(----).
> 2017-12-04 08:48:30,700 INFO
> org.apache.flink.runtime.clusterframework.standalone.StandaloneResourc
> eManager
> - Resource Manager associating with leading JobManager
> Actor[akka://flink/user/jobmanager#886586058] - leader session
> ----
> 2017-12-04 08:53:50,635 INFO
> org.apache.flink.runtime.clusterframework.standalone.StandaloneResourc
> eManager
> - TaskManager 627338086a766c140909ba45f2e717d0 has started.
> 2017-12-04 08:53:50,638 INFO
> org.apache.flink.runtime.instance.InstanceManager -
> Registered TaskManager at flink-taskmanager-65cf757d9b-hj65d
> (akka.tcp://flink@flink-taskmanager-65cf757d9b-hj65d:45932/user/taskma
> nager) as f9d2843d0223b15d8fce52aea8231cc6. Current number of
> registered hosts is 1. Current number of alive task slots is 8.
> 2017-12-04 08:53:50,658 WARN
> akka.serialization.Serialization(akka://flink)- Using
> the default Java serializer for class
> [org.apache.flink.runtime.messages.JobManagerMessages$LeaderSessionMes
> sage] which is not recommended because of performance implications.
> Use another serializer or disable this warning using the setting
> 'akka.actor.warn-about-java-serializer-usage'
> 2017-12-04 08:53:55,714 INFO
> org.apache.flink.runtime.clusterframework.standalone.StandaloneResourc
> eManager
> - TaskManager 08c3e6f7c765e2ab88e2ea645049cb9d has started.
> 2017-12-04 08:53:55,714 INFO
> org.apache.flink.runtime.instance.InstanceManager -
> Registered TaskManager at flink-taskmanager-65cf757d9b-jtzw5
> (akka.tcp://flink@flink-taskmanager-65cf757d9b-jtzw5:41710/user/taskma
> nager) as da8a8da3650ce53f460784c54938a071. Current number of
> registered hosts is 2. Current number of alive task slots is 16.
> 2017-12-04 09:04:08,850 ERROR
> org.apache.flink.runtime.rest.handler.legacy.files.StaticFileServerHan
> dler
> - Caught exception

Re: aggregate does not allow RichAggregateFunction ?

2017-12-05 Thread Vishal Santoshi
It seems that this has to do with session windows tbat are mergeable ? I
tried the RixhWindow function and that seems to suggest that one cannot use
state ? Any ideas folks...

On Dec 1, 2017 10:38 AM, "Vishal Santoshi" 
wrote:

> I have a simple Aggregation with one caveat. For some reason I have to
> keep a large amount of state till the window is GCed. The state is within
> the Accumulator ( ACC ). I am hitting a memory bottleneck and would like to
> offload the state  to the states backend ( ROCKSDB), keeping the between
> checkpoint state in memory ( seems to be an obvious fix). I am not though
> allowed to have a RichAggregateFunction in the aggregate method of a
> windowed stream . That begs 2 questions
>
> 1. Why
> 2. Is there an alternative for stateful window aggregation where we manage
> the state. ?
>
> Thanks Vishal
>
>
> Here is the code ( generics but it works  )
>
> SingleOutputStreamOperator retVal = input
> .keyBy(keySelector)
> .window(EventTimeSessionWindows.withGap(gap))
> .aggregate(
> new AggregateFunction() {
>
> @Override
> public ACC createAccumulator() {
> ACC newInstance = (ACC) accumulator.clone();
> newInstance.resetLocal();
> return newInstance;
> }
>
> @Override
> public void add(IN value, ACC accumulator) {
> accumulator.add(value);
>
> }
>
> @Override
> public OUT getResult(ACC accumulator) {
> return accumulator.getLocalValue();
> }
>
> @Override
> public ACC merge(ACC a, ACC b) {
> a.merge(b);
> return a;
> }
> }, new WindowFunction() {
> @Override
> public void apply(KEY s, TimeWindow window, Iterable 
> input, Collector out) throws Exception {
> out.collect(input.iterator().next());
> }
> }, accType, aggregationResultType, aggregationResultType);
>
>


Re: How to perform efficient DataSet reuse between iterations

2017-12-05 Thread Miguel Coimbra
Hello Fabian,

Thanks for the help.
I am interested in the duration of specific operators, so the fact that
parts of the execution are in pipeline is not a problem for me.
>From my understanding, the automated way to approach this is to run the
Flink job with the web interface active and then make a REST call on the
appropriate job and parse the JSON.
I have implemented that and my program is able to read the duration of
specific operators.

However, I'm facing another problem whose cause I haven't been able to
pinpoint.

Testing on 1.4.0-SNAPSHOT.

When launching a cluster (start-local.bat on Windows or start-cluster.sh on
Linux), the JobManager, the TaskManager(s) are launched and the web front
end becomes active (I can access it via browser) - everything is ok so far.
The problem occurs when the number of operators in the plan increases.

Consider that I execute my algorithm three (3) times through a single Flink
plan.
For three times, vertices and edges will be added to the graph (via Gelly
methods).
This is defined in a for loop in my program. For each iteration:

// I add 100 edges to the graph, decomposed in a list of vertices and edges
final Graph newGraph = graph
.addVertices(verticesToAdd)
.addEdges(edgesToAdd);

// Generate identifications for the vertex counter.
final String vid = new AbstractID().toString();
newGraph.getVertices().output(new Utils.CountHelper>(vid)).name("count()");
vids.put(executionCounter, vid);

// Generate identifications for the edge counter.
final String eid = new AbstractID().toString();
newGraph.getEdges().output(new Utils.CountHelper>(eid)).name("count()");
eids.put(executionCounter, eid);

So far I have created 2 sinks in the current iteration in my regular Java
program.
Then I execute my graph algorithm with the previous results:

// Execute the graph algorithm and output some of its results
result = newGraph.run(new MyGraphAlgorithm<>(previousResults));
result.sortPartition(1,
Order.DESCENDING).setParallelism(1).first(1000).output(outputFormat).name("Store
results");
previousResults = result;

This creates one additional sink via the output function.
So for three (3) iterations, I define nine (9) sinks in the plan, call
execute() and afterward retrieve the contents of the sinks.
This works fine so far.

If I run with 10 iterations, I will be creating 30 sinks.
The problem is that for 10 iterations, the Flink client program just hangs
on the execute() call forever (execution time should increase linearly with
the number of iterations).
For 3 iterations, execute() proceeds normally and it takes around 20
seconds per iteration, so 3 iterations is 60 seconds and 10 should be
around 3 minutes.
After five hours, there was no progress.

Furthermore, I checked the web monitor periodically and there was not a
single job.
It seems that the client program is simply not sending the job to the
cluster if the job plan becomes too big.
The exact same compiled program, with 3 iterations (via argument) works,
but with 10 (via argument) it simply falls silent.

I am trying to understand what may be the problem:

- An internal limit in the number of datasinks or operators in the plan?

- A limit in message size preventing the client from sending the job?
(see: https://issues.apache.org/jira/browse/FLINK-2603 )
I have tried increasing the akka.framesize to 256000kB in the Flink server
flink-conf.yaml config and in the client program when creating the remote
environment with:

Configuration clientConfig = new Configuration();
final String akkaFrameSize = "256000kB";
ConfigOption akkaConfig =
ConfigOptions.key(AkkaOptions.FRAMESIZE.key()).defaultValue(akkaFrameSize);
clientConfig.setString(akkaConfig, akkaFrameSize);
env = new RemoteEnvironment(remoteAddress, remotePort, clientConfig,
jarFiles, null);

I have run out of ideas with respect to the causes.
Hoping you may be able to shed some light.

Thank you for your time,

Best regards,

Miguel E. Coimbra
Email: miguel.e.coim...@gmail.com 
Skype: miguel.e.coimbra

On 29 November 2017 at 10:23, Fabian Hueske  wrote:

> The monitoring REST interface provides detailed stats about a job, its
> tasks, and processing verticies including their start and end time [1].
>
> However, it is not trivial to make sense of the execution times because
> Flink uses pipelined shuffles by default.
> That means that the execution of multiple operators can overlap. For
> example the records that are produced by a GroupReduce can be processed by
> a Map, shuffled, and sorted (for another GroupReduce) in a pipelined
> fashion.
> Hence, all these operations run at the same time. You can disable this
> behavior to some extend by setting the execution mode to batched shuffles
> [2].
> However, this will likely have a negative impact on the overall execution
> time.
>
> Best, Fabian
>
> [1] 

Re: CPU Cores of JobManager

2017-12-05 Thread Yuta Morisawa

Hi Timo

I execute streaming job without checkpointing and I don't configure any 
state backend, so it may be "MemoryStateBackend".


Actually, my streaming app just reads data from kafka and writes it to 
an external DB. Its not so complicated.


Regards,
Yuta

On 2017/12/05 19:55, Timo Walther wrote:
I had some profiling tool like jvisualvm in mind. Are you executing 
streaming or batch jobs? If streaming, is checkpointing enabled and 
which type of statebackend?


@Chesnay do you have experience with slow behavior of the Web UI?

Regards,
Timo


Am 12/5/17 um 10:37 AM schrieb Yuta Morisawa:

Hi Timo

Thank you for your early reply.

These are commands which I run my apps.
./bin/yarn-session.sh -n 20 -jm 6000 -tm 24000 -s 10
./bin/flink run -p 100  
./bin/flink run -p 100  

So, JobManager Heap Memory = 6000 MB and it manages 2 jobs.

> Maybe you can use a profiler and find out which component consumes so
> much CPU resources?
You mean Java Flight Recorder or JITWatch?
Or, Flink has original profiler?
https://ci.apache.org/projects/flink/flink-docs-release-1.3/monitoring/application_profiling.html 



Regards,
Yuta

On 2017/12/05 18:02, Timo Walther wrote:

Hi Yuta,

as far as I know you cannot assign more cores to a JobManager.

Can you tell us a bit more about your environment? How many jobs does 
the JobManager has to manage? How much heap memory is assigned to the 
JobManager?


Maybe you can use a profiler and find out which component consumes so 
much CPU resources?


Regards,
Timo


Am 12/5/17 um 5:13 AM schrieb Yuta Morisawa:

Hi

Now I am looking for the way to increase the number of allocated CPU 
cores because my JobManagaer WEBUI is very heavy and sometimes freeze.


I think this is caused by the resource shortage of JobManager.
How can I increase the number of CPU for JobManager in YARN mode?



Thanks
Yuta











Re: Flink Batch Performance degradation at scale

2017-12-05 Thread Fabian Hueske
Hi,

Flink's operators are designed to work in memory as long as possible and
spill to disk once the memory budget is exceeded.
Moreover, Flink aims to run programs in a pipelined fashion, such that
multiple operators can process data at the same time.
This behavior can make it a bit tricky to analyze the runtime behavior and
progress of operators.

It would be interesting to have a look at the execution plan for the
program that you are running.
The plan can be obtained from the ExecutionEnvironment by calling
env.getExecutionPlan() instead of env.execute().

I would also like to know how you track the progress of the program.
Are you looking at the record counts displayed in the WebUI?

Best,
Fabian



2017-12-05 22:03 GMT+01:00 Garrett Barton :

> I have been moving some old MR and hive workflows into Flink because I'm
> enjoying the api's and the ease of development is wonderful.  Things have
> largely worked great until I tried to really scale some of the jobs
> recently.
>
> I have for example one etl job that reads in about 12B records at a time
> and does a sort, some simple transformations, validation, a re-partition
> and then output to a hive table.
> When I built it with the sample set, ~200M, it worked great, took maybe a
> minute and blew threw it.
>
> What I have observed is there is some kind of saturation reached depending
> on number of slots, number of nodes and the overall size of data to move.
> When I run the 12B set, the first 1B go through in under 1 minute, really
> really fast.  But its an extremely sharp drop off after that, the next 1B
> might take 15 minutes, and then if I wait for the next 1B, its well over an
> hour.
>
> What I cant find is any obvious indicators or things to look at,
> everything just grinds to a halt, I don't think the job would ever actually
> complete.
>
> Is there something in the design of flink in batch mode that is perhaps
> memory bound?  Adding more nodes/tasks does not fix it, just gets me a
> little further along.  I'm already running around ~1,400 slots at this
> point, I'd postulate needing 10,000+ to potentially make the job run, but
> thats too much of my cluster gone, and I have yet to get flink to be stable
> past 1,500.
>
> Any idea's on where to look, or what to debug?  GUI is also very
> cumbersome to use at this slot count too, so other measurement ideas are
> welcome too!
>
> Thank you all.
>


Flink Batch Performance degradation at scale

2017-12-05 Thread Garrett Barton
I have been moving some old MR and hive workflows into Flink because I'm
enjoying the api's and the ease of development is wonderful.  Things have
largely worked great until I tried to really scale some of the jobs
recently.

I have for example one etl job that reads in about 12B records at a time
and does a sort, some simple transformations, validation, a re-partition
and then output to a hive table.
When I built it with the sample set, ~200M, it worked great, took maybe a
minute and blew threw it.

What I have observed is there is some kind of saturation reached depending
on number of slots, number of nodes and the overall size of data to move.
When I run the 12B set, the first 1B go through in under 1 minute, really
really fast.  But its an extremely sharp drop off after that, the next 1B
might take 15 minutes, and then if I wait for the next 1B, its well over an
hour.

What I cant find is any obvious indicators or things to look at, everything
just grinds to a halt, I don't think the job would ever actually complete.

Is there something in the design of flink in batch mode that is perhaps
memory bound?  Adding more nodes/tasks does not fix it, just gets me a
little further along.  I'm already running around ~1,400 slots at this
point, I'd postulate needing 10,000+ to potentially make the job run, but
thats too much of my cluster gone, and I have yet to get flink to be stable
past 1,500.

Any idea's on where to look, or what to debug?  GUI is also very cumbersome
to use at this slot count too, so other measurement ideas are welcome too!

Thank you all.


Re: S3 Write Execption

2017-12-05 Thread vinay patil
Hi Stephan,

I am facing S3 consistency related issue with the exception pasted at the
end:

We were able to solve the s3 sync issue by adding System.currentTime to
inprogressPrefix, inprogressSuffix, s3PendingPrefix and s3PendingSuffix
properties of BucketingSink.

I tried another approach by updating the BucketingSink code wherein I have
appended the partPath variable with System.currentTime (in openNewPartFile
method).

Can you please let me know if this is the correct approach in order to get
rid of this exception.

TimerException{java.io.IOException: Unable to create file due to concurrent
write, file corrupted potentially: s3:///part-0-0inprogress}
 at
org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:220)
 at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
 at java.util.concurrent.FutureTask.run(FutureTask.java:266)
 at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
 at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
 at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
 at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
 at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException: Unable to create file due to concurrent
write, file corrupted potentially: s3:///part-0-0inprogress
 at
com.amazon.ws.emr.hadoop.fs.consistency.ConsistencyCheckerS3FileSystem$1.execute(ConsistencyCheckerS3FileSystem.java:245)
 at
com.amazon.ws.emr.hadoop.fs.consistency.ConsistencyCheckerS3FileSystem$1.execute(ConsistencyCheckerS3FileSystem.java:201)
 at
com.amazon.ws.emr.hadoop.fs.s3.S3FSOutputStream.close(S3FSOutputStream.java:188)
 at
org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:74)
 at
org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:108)
 at
org.apache.flink.streaming.connectors.fs.StreamWriterBase.close(StreamWriterBase.java:147)
 at
org.apache.flink.streaming.connectors.fs.SequenceFileWriter.close(SequenceFileWriter.java:116)
 at
org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.closeCurrentPartFile(BucketingSink.java:554)
 at
org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.checkForInactiveBuckets(BucketingSink.java:496)
 at
org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.onProcessingTime(BucketingSink.java:479)


Regards,
Vinay Patil



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: S3 Write Execption

2017-12-05 Thread vinay patil
Hi Stephan,

I am facing S3 consistency related issue with the exception pasted at the
end:

We were able to solve the s3 sync issue by adding System.currentTime to
inprogressPrefix, inprogressSuffix, s3PendingPrefix and s3PendingSuffix
properties of BucketingSink.

I tried another approach by updating the BucketingSink code wherein I have
appended the partPath variable with System.currentTime (in openNewPartFile
method).

Can you please let me know if this is the correct approach in order to get
rid of this exception.

TimerException{java.io.IOException: Unable to create file due to concurrent
write, file corrupted potentially: s3:///part-0-0inprogress}
 at
org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:220)
 at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
 at java.util.concurrent.FutureTask.run(FutureTask.java:266)
 at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
 at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
 at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
 at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
 at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException: Unable to create file due to concurrent
write, file corrupted potentially: s3:///part-0-0inprogress
 at
com.amazon.ws.emr.hadoop.fs.consistency.ConsistencyCheckerS3FileSystem$1.execute(ConsistencyCheckerS3FileSystem.java:245)
 at
com.amazon.ws.emr.hadoop.fs.consistency.ConsistencyCheckerS3FileSystem$1.execute(ConsistencyCheckerS3FileSystem.java:201)
 at
com.amazon.ws.emr.hadoop.fs.s3.S3FSOutputStream.close(S3FSOutputStream.java:188)
 at
org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:74)
 at
org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:108)
 at
org.apache.flink.streaming.connectors.fs.StreamWriterBase.close(StreamWriterBase.java:147)
 at
org.apache.flink.streaming.connectors.fs.SequenceFileWriter.close(SequenceFileWriter.java:116)
 at
org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.closeCurrentPartFile(BucketingSink.java:554)
 at
org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.checkForInactiveBuckets(BucketingSink.java:496)
 at
org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.onProcessingTime(BucketingSink.java:479)


Regards,
Vinay Patil



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: slot group indication per operator

2017-12-05 Thread Timo Walther

Hi Tovi,

you are right, it is difficult to check the correct behavior.

@Chesnay: Do you know if we can get this information? If not through the 
Web UI, maybe via REST? Do we have access to the full ExecutionGraph 
somewhere?


Otherwise it might make sense to open an issue for this.

Regards,
Timo


Am 12/5/17 um 4:25 PM schrieb Sofer, Tovi :


Hi all,

I am trying to use the slot group feature, by having ‘default’ group 
and additional ‘market’ group.


The purpose is to divide the resources equally between two sources and 
their following operators.


I’ve set the slotGroup on the source of the market data.

Can I assume that all following operators created from this source 
will use same slot group of ‘market’?


(The operators created for market stream are pretty complex, with 
connect and split).


In Web UI I saw there are 16 slots, but didn’t see indication per 
operator to which group it was assigned. How can I know?


Relevant Code:
env.setParallelism(8);

conf.setInteger(ConfigConstants.*/TASK_MANAGER_NUM_TASK_SLOTS/*, 16); 
\\ to allow Parallelism of 8 per group


// Market source and operators:

KeyedStream windowedStreamA = *sourceProvider*.provide(env)     .name(*spotSourceProvider*.getName())     .slotSharingGroup(SourceMsgType.*/MARKET/*.slotGroup())     .flatMap(*new *ParserMapper(*new *MarketMessageParser()))     .name(ParserMapper.*class*.getSimpleName())     .filter(*new *USDFilter())     .name(USDFilter.*class*.getSimpleName())     .keyBy(MarketEvent.*/CURRENCY_FIELD/*)     .timeWindow(Time./of/(windowSizeMs, TimeUnit.*/MILLISECONDS/*))     .process(*new *LastInWindowPriceChangeFunction()))     
.name(LastInWindowPriceChangeFunction.*class*.getSimpleName())     .keyBy(SpotTickEvent.*/CURRENCY_FIELD/*);


marketConnectedStream = windowedStreamA.connect(windowedStreamB) 
    .flatMap(*new *MarketCoMapper()))     .name(MarketCoMapper.*class*.getSimpleName())
SplitStream stocksWithSpotsStreams = 
marketConnectedStream     .split( market -> ImmutableList./of/(*"splitA"*,*" splitB"*) ); DataStream< MarketAWithMarketB> splitA = 
stocksWithSpotsStreams.select(*"splitA "*);


Thanks and regards,

Tovi





Re: Share state across operators

2017-12-05 Thread Shailesh Jain
Thanks, Timo.

Either works for me.

On Tue, Dec 5, 2017 at 4:55 PM, Timo Walther  wrote:

> Hi Shailesh,
>
> sharing state across operators is not possible. However, you could emit
> the state (or parts of it) as a stream element to downstream operators by
> having a function that emits a type like "Either>".
>
> Another option would be to use side outputs to send state to downstream
> operators [0].
>
> Maybe you can tell use a bit more about what you want to achieve?
>
> Regards,
> Timo
>
> [0] https://ci.apache.org/projects/flink/flink-docs-release-1.3/
> dev/stream/side_output.html
>
>
> Am 12/5/17 um 11:58 AM schrieb Shailesh Jain:
>
> Hi,
>>
>> Is it possible to share state across operators in Flink?
>>
>> I have CoFlatMap operator which maintains a ListState and returns a
>> DataStream. And downstream there is a KafkaSink operator for the same
>> DataStream which needs to access the ListState.
>>
>> Thanks,
>> Shailesh
>>
>
>
>


slot group indication per operator

2017-12-05 Thread Sofer, Tovi
Hi all,
I am trying to use the slot group feature, by having 'default' group and 
additional 'market' group.
The purpose is to divide the resources equally between two sources and their 
following operators.
I've set the slotGroup on the source of the market data.
Can I assume that all following operators created from this source will use 
same slot group of 'market'?
(The operators created for market stream are pretty complex, with connect and 
split).
In Web UI I saw there are 16 slots, but didn't see indication per operator to 
which group it was assigned. How can I know?

Relevant Code:

env.setParallelism(8);
conf.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 16); \\ to allow 
Parallelism of 8 per group

// Market source and operators:

KeyedStream windowedStreamA = sourceProvider.provide(env)
.name(spotSourceProvider.getName())
.slotSharingGroup(SourceMsgType.MARKET.slotGroup())
.flatMap(new ParserMapper(new MarketMessageParser()))
.name(ParserMapper.class.getSimpleName())
.filter(new USDFilter())
.name(USDFilter.class.getSimpleName())
.keyBy(MarketEvent.CURRENCY_FIELD)
.timeWindow(Time.of(windowSizeMs, TimeUnit.MILLISECONDS))
.process(new LastInWindowPriceChangeFunction()))
.name(LastInWindowPriceChangeFunction.class.getSimpleName())
.keyBy(SpotTickEvent.CURRENCY_FIELD);


marketConnectedStream = windowedStreamA.connect(windowedStreamB)
.flatMap(new MarketCoMapper()))
.name(MarketCoMapper.class.getSimpleName())



SplitStream stocksWithSpotsStreams = marketConnectedStream
.split( market -> ImmutableList.of("splitA"," splitB") );

DataStream< MarketAWithMarketB> splitA = stocksWithSpotsStreams.select("splitA 
");


Thanks and regards,
Tovi




Re: Share state across operators

2017-12-05 Thread Timo Walther

Hi Shailesh,

sharing state across operators is not possible. However, you could emit 
the state (or parts of it) as a stream element to downstream operators 
by having a function that emits a type like 
"Either".


Another option would be to use side outputs to send state to downstream 
operators [0].


Maybe you can tell use a bit more about what you want to achieve?

Regards,
Timo

[0] 
https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/stream/side_output.html



Am 12/5/17 um 11:58 AM schrieb Shailesh Jain:

Hi,

Is it possible to share state across operators in Flink?

I have CoFlatMap operator which maintains a ListState and returns a 
DataStream. And downstream there is a KafkaSink operator for the same 
DataStream which needs to access the ListState.


Thanks,
Shailesh





Re: Flink with Ceph as the persistent storage

2017-12-05 Thread Gyula Fóra
It would be the same as with any other form of async checkpointing. No
direct blocking of processing but the network traffic might indirectly
affect it to some extent :)

Jayant Ameta  ezt írta (időpont: 2017. dec. 5., K,
12:15):

> If the checkpointing to Ceph happens asynchronously, does it still have
> any impact on the stream processing?
>
> Jayant Ameta
>
> On Tue, Dec 5, 2017 at 4:34 PM, Gyula Fóra  wrote:
>
>> Hi,
>>
>> To my understanding Ceph as in http://ceph.com/ceph-storage/  is a block
>> based object storage system. You can use it mounted to your server and will
>> behave as a local file system to most extent but will be shared in the
>> cluster.
>>
>> The performance might not be as good as with HDFS to our experience.
>>
>> Gyula
>>
>> Jayant Ameta  ezt írta (időpont: 2017. dec. 5., K,
>> 12:00):
>>
>>> Hi,
>>> Flink documents suggests that Ceph can be used as a persistent storage
>>> for states.
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/stream/checkpointing.html
>>>
>>> Considering that Ceph is a transactional database, wouldn't it have
>>> adverse effect on Flink's performance?
>>>
>>>
>>>
>


Re: Flink with Ceph as the persistent storage

2017-12-05 Thread Jayant Ameta
If the checkpointing to Ceph happens asynchronously, does it still have any
impact on the stream processing?

Jayant Ameta

On Tue, Dec 5, 2017 at 4:34 PM, Gyula Fóra  wrote:

> Hi,
>
> To my understanding Ceph as in http://ceph.com/ceph-storage/  is a block
> based object storage system. You can use it mounted to your server and will
> behave as a local file system to most extent but will be shared in the
> cluster.
>
> The performance might not be as good as with HDFS to our experience.
>
> Gyula
>
> Jayant Ameta  ezt írta (időpont: 2017. dec. 5., K,
> 12:00):
>
>> Hi,
>> Flink documents suggests that Ceph can be used as a persistent storage
>> for states. https://ci.apache.org/projects/flink/flink-docs-
>> release-1.3/dev/stream/checkpointing.html
>>
>> Considering that Ceph is a transactional database, wouldn't it have
>> adverse effect on Flink's performance?
>>
>>
>>


Re: Share state across operators

2017-12-05 Thread Shailesh Jain
Missed one point - I'm using Managed Operator state (and not Keyed state -
as my data streams are not keyed).

On Tue, Dec 5, 2017 at 4:28 PM, Shailesh Jain 
wrote:

> Hi,
>
> Is it possible to share state across operators in Flink?
>
> I have CoFlatMap operator which maintains a ListState and returns a
> DataStream. And downstream there is a KafkaSink operator for the same
> DataStream which needs to access the ListState.
>
> Thanks,
> Shailesh
>


Re: Flink with Ceph as the persistent storage

2017-12-05 Thread Gyula Fóra
Hi,

To my understanding Ceph as in http://ceph.com/ceph-storage/  is a block
based object storage system. You can use it mounted to your server and will
behave as a local file system to most extent but will be shared in the
cluster.

The performance might not be as good as with HDFS to our experience.

Gyula

Jayant Ameta  ezt írta (időpont: 2017. dec. 5., K,
12:00):

> Hi,
> Flink documents suggests that Ceph can be used as a persistent storage for
> states.
> https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/stream/checkpointing.html
>
> Considering that Ceph is a transactional database, wouldn't it have
> adverse effect on Flink's performance?
>
>
>


Flink with Ceph as the persistent storage

2017-12-05 Thread Jayant Ameta
Hi,
Flink documents suggests that Ceph can be used as a persistent storage for
states.
https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/stream/checkpointing.html

Considering that Ceph is a transactional database, wouldn't it have adverse
effect on Flink's performance?


Share state across operators

2017-12-05 Thread Shailesh Jain
Hi,

Is it possible to share state across operators in Flink?

I have CoFlatMap operator which maintains a ListState and returns a
DataStream. And downstream there is a KafkaSink operator for the same
DataStream which needs to access the ListState.

Thanks,
Shailesh


Re: CPU Cores of JobManager

2017-12-05 Thread Timo Walther
I had some profiling tool like jvisualvm in mind. Are you executing 
streaming or batch jobs? If streaming, is checkpointing enabled and 
which type of statebackend?


@Chesnay do you have experience with slow behavior of the Web UI?

Regards,
Timo


Am 12/5/17 um 10:37 AM schrieb Yuta Morisawa:

Hi Timo

Thank you for your early reply.

These are commands which I run my apps.
./bin/yarn-session.sh -n 20 -jm 6000 -tm 24000 -s 10
./bin/flink run -p 100  
./bin/flink run -p 100  

So, JobManager Heap Memory = 6000 MB and it manages 2 jobs.

> Maybe you can use a profiler and find out which component consumes so
> much CPU resources?
You mean Java Flight Recorder or JITWatch?
Or, Flink has original profiler?
https://ci.apache.org/projects/flink/flink-docs-release-1.3/monitoring/application_profiling.html 



Regards,
Yuta

On 2017/12/05 18:02, Timo Walther wrote:

Hi Yuta,

as far as I know you cannot assign more cores to a JobManager.

Can you tell us a bit more about your environment? How many jobs does 
the JobManager has to manage? How much heap memory is assigned to the 
JobManager?


Maybe you can use a profiler and find out which component consumes so 
much CPU resources?


Regards,
Timo


Am 12/5/17 um 5:13 AM schrieb Yuta Morisawa:

Hi

Now I am looking for the way to increase the number of allocated CPU 
cores because my JobManagaer WEBUI is very heavy and sometimes freeze.


I think this is caused by the resource shortage of JobManager.
How can I increase the number of CPU for JobManager in YARN mode?



Thanks
Yuta








Re: Maintain heavy hitters in Flink application

2017-12-05 Thread Fabian Hueske
Hi,

I haven't done that before either. The query API will change with the next
version (Flink 1.4.0) which is currently being prepared for releasing.
Kostas (in CC) might be able to help you.

Best, Fabian

2017-12-05 9:52 GMT+01:00 m@xi :

> Hi Fabian,
>
> Thanks for your answer. Initially, I have excluded Queryable State as an
> option as it explicitly mentioned that it is used for querying state
> outside
> flink.
>
> Now that I am reading the documentation I am not sure how I may achieve
> that. I have to set ports and addresses which I am not sure I should since
> I
> am reading the queryable state from inside the same job.
>
> Can you or someone elaborate further how can I read the queryable state of
> a
> specific task from another task (e.g. map).
>
> Best,
> Max
>
>
>
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.
> n4.nabble.com/
>


Re: Window function support on SQL

2017-12-05 Thread Fabian Hueske
Hi Tao,

computing a group window requires that the event-time timestamp of the
DataStream is exposed as a time attribute (in your case as an event time
attribute).
If you register DataStream at the TableEnvironment, this has to be done in
two steps:

1) assign timestamps and watermarks to the DataStream [1]:

val kinesisStream = env
  .fromCollection(testData)
  .assignTimestampsAndWatermarks(yourAssigner) // yourAssigner should
extract the event_timestamp field of the Avro record and assign
watermarks

2) declare the event timestamp of the DataStream as an attribute in the
schema of the table [2]:

tableEnv.registerDataStream(streamName, kinesisStream, 'nd_key,
'concept_rank, 'event_timestamp.rowtime);


Once event_timestamp is declared as time attribute, it can be used in
window functions.

Best, Fabian

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/event_timestamps_watermarks.html
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/table/streaming.html#event-time

2017-12-05 1:40 GMT+01:00 Tao Xia :

> Thanks for the quick response Fabian
>
> I have DataStream of avro objects.  Not sure how to add a TIMESTAMP
> attribute or convert the event_timestramp field to Timestamp Attribute for
> my SQL use cases.  Most docs only covers the Table API with static schema.
> p.s. my Avro schema has 100+ fields.
> Can you guide me how to prepare my query to aggregate by nd_key and
> event_timestamp per hour?
>
> val testData = List(
>
>   
> UnifiedEvent.newBuilder().setNdKey("nd101").setConceptRank(10).setEventTimestamp(1512172415.longValue()).build(),
>   
> UnifiedEvent.newBuilder().setNdKey("nd101").setConceptRank(15).setEventTimestamp(1512172415.longValue()).build(),
>   
> UnifiedEvent.newBuilder().setNdKey("nd102").setConceptRank(20).setEventTimestamp(1512172415.longValue()).build(),
>   
> UnifiedEvent.newBuilder().setNdKey("nd102").setConceptRank(25).setEventTimestamp(1512172415.longValue()).build()
> )
>
> val kinesisStream = env.fromCollection(testData)
>
> tableEnv.registerDataStream(streamName, avroStream);
>
> val query = "SELECT nd_key, sum(concept_rank) FROM "+streamName + " GROUP
> BY nd_key"
>
> Thanks,
> Tao
>
> On Mon, Dec 4, 2017 at 3:32 PM, Fabian Hueske  wrote:
>
>> Hi,
>>
>> yes, Apache Calcite's group window functions are supported.
>>
>> The error message tells you that the attribute event_timestamp should be
>> of type DATETIME (or TIMESTAMP) and not BIGINT.
>> Please check the documentation for details [1].
>>
>> Best, Fabian
>>
>> [1] https://ci.apache.org/projects/flink/flink-docs-release-1.3/
>> dev/table/sql.html#group-windows
>>
>> 2017-12-04 22:17 GMT+01:00 Tao Xia :
>>
>>> Hi All,
>>>   Do you know if window function supported on SQL yet?
>>>   I got the error message when trying to use group function in SQL.
>>>
>>> My query below:
>>>
>>> val query = "SELECT nd_key, concept_rank, event_timestamp FROM "+streamName 
>>> + " GROUP BY TUMBLE(event_timestamp, INTERVAL '1' HOUR), nd_key"
>>>
>>>
>>> Error Message:
>>> Exception in thread "main" org.apache.flink.table.api.ValidationException:
>>> SQL validation failed. From line 1, column 74 to line 1, column 115: Cannot
>>> apply 'TUMBLE' to arguments of type 'TUMBLE(, )'.
>>> Supported form(s): 'TUMBLE(, )'
>>> 'TUMBLE(, , )'
>>> at org.apache.flink.table.calcite.FlinkPlannerImpl.validate(Fli
>>> nkPlannerImpl.scala:93)
>>> at org.apache.flink.table.api.TableEnvironment.sqlQuery(TableEn
>>> vironment.scala:561)
>>> at com.udacity.data.pipeline.AggregationJob$.main(AggregationJo
>>> b.scala:43)
>>> at com.udacity.data.pipeline.AggregationJob.main(AggregationJob.scala)
>>> Caused by: org.apache.calcite.runtime.CalciteContextException: From
>>> line 1, column 74 to line 1, column 115: Cannot apply 'TUMBLE' to arguments
>>> of type 'TUMBLE(, )'. Supported form(s):
>>> 'TUMBLE(, )'
>>> 'TUMBLE(, , )'
>>> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
>>> at sun.reflect.NativeConstructorAccessorImpl.newInstance(Native
>>> ConstructorAccessorImpl.java:62)
>>> at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(De
>>> legatingConstructorAccessorImpl.java:45)
>>> at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
>>> at org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Reso
>>> urces.java:463)
>>> at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:803)
>>> at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:788)
>>> at org.apache.calcite.sql.validate.SqlValidatorImpl.newValidati
>>> onError(SqlValidatorImpl.java:4654)
>>> at org.apache.calcite.sql.SqlCallBinding.newValidationSignature
>>> Error(SqlCallBinding.java:284)
>>> at org.apache.calcite.sql.type.FamilyOperandTypeChecker.checkSi
>>> ngleOperandType(FamilyOperandTypeChecker.java:92)
>>> at org.apache.calcite.sql.type.FamilyOperandTypeChecker.checkOp
>>> erandTypes(FamilyOperandTypeChecker.java:109)
>>> at 

Re: CPU Cores of JobManager

2017-12-05 Thread Timo Walther

Hi Yuta,

as far as I know you cannot assign more cores to a JobManager.

Can you tell us a bit more about your environment? How many jobs does 
the JobManager has to manage? How much heap memory is assigned to the 
JobManager?


Maybe you can use a profiler and find out which component consumes so 
much CPU resources?


Regards,
Timo


Am 12/5/17 um 5:13 AM schrieb Yuta Morisawa:

Hi

Now I am looking for the way to increase the number of allocated CPU 
cores because my JobManagaer WEBUI is very heavy and sometimes freeze.


I think this is caused by the resource shortage of JobManager.
How can I increase the number of CPU for JobManager in YARN mode?



Thanks
Yuta





Re: subscribe

2017-12-05 Thread Chesnay Schepler
To subscribe to this mailing list, please send a mail to 
user-subscr...@flink.apache.org .


On 05.12.2017 08:21, Xin Wang wrote:

hello flink

--
Thanks,
Xin