Re: REST Endpoint for Triggering Savepoint

2017-11-07 Thread Chesnay Schepler

No, this is not currently supported, but we plan to include this in 1.5 .

On 07.11.2017 20:01, vijayakumar palaniappan wrote:

Hi,

Is there a REST endpoint for triggering savepoint without cancelling 
the job?


--
Thanks,
-Vijay





REST Endpoint for Triggering Savepoint

2017-11-07 Thread vijayakumar palaniappan
Hi,

Is there a REST endpoint for triggering savepoint without cancelling the
job?

-- 
Thanks,
-Vijay


AvroParquetWriter may cause task managers to get lost

2017-11-07 Thread Ivan Budincevic
Hi all,

We recently implemented a feature in our streaming flink job in which we have a 
AvroParquetWriter which we build every time the overridden “write” method from 
org.apache.flink.streaming.connectors.fs.Writer gets called. We had to do this 
because the schema of each record is potentially different and we have to get 
the schema for the AvroParquetWriter out of the record itself first. Previously 
this builder was built only one time in the “open” method and from then only 
the write method was called per record.

Since implementing this our job crashes with “Connection unexpectedly closed by 
remote task manager ‘internal company url’. This might indicate that the remote 
task manager was lost.”

We did not run into any issues on our test environments, so we are suspecting 
this problem occurs only on higher loads as we have on our production 
environment. Unfortunately we still don’t have a proper means of reproducing 
this much load on our test environment to debug.

Would having the AvroParquetWriter being built on every write be causing the 
problem and if so why would that be the case?

Any help in getting to the bottom of the issue would be really appreciated. 
Bellow there is a code snippet of the class which uses the AvroParquetWriter.

Best regards,
Ivan Budincevic
Software engineer, bol.com
Netherlands

package com.bol.measure.timeblocks.files;

import com.bol.measure.timeblocks.measurement.SlottedMeasurements;
import org.apache.avro.generic.GenericRecord;
import org.apache.flink.streaming.connectors.fs.Writer;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.avro.AvroParquetWriter;
import org.apache.parquet.column.ParquetProperties;
import org.apache.parquet.hadoop.ParquetFileWriter;
import org.apache.parquet.hadoop.ParquetWriter;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;

import java.io.IOException;

public class SlottedMeasurementsWriter implements Writer {
  private transient ParquetWriter parquetWriter;
  private boolean overwrite;
  private Path path;

  public SlottedMeasurementsWriter(boolean overwrite) {
this.overwrite = overwrite;
  }

  @Override
  public void open(FileSystem fs, Path path) throws IOException {
this.path = path;
  }

  @Override
  public long flush() throws IOException {
return parquetWriter.getDataSize();
  }

  @Override
  public long getPos() throws IOException {
return parquetWriter.getDataSize();
  }

  @Override
  public void close() throws IOException {
parquetWriter.close();
  }

  @Override
  public void write(SlottedMeasurements slot) throws IOException {

final AvroParquetWriter.Builder writerBuilder =
  AvroParquetWriter
.builder(path)
.withSchema(slot.getMeasurements().get(0).getSchema())
.withCompressionCodec(CompressionCodecName.UNCOMPRESSED)
.withDictionaryEncoding(true)
.withWriterVersion(ParquetProperties.WriterVersion.PARQUET_1_0);
if (overwrite) {
  writerBuilder.withWriteMode(ParquetFileWriter.Mode.OVERWRITE);
}

parquetWriter = writerBuilder.build();

for (GenericRecord measurement : slot.getMeasurements()) {
  parquetWriter.write(measurement);
}
  }


  @Override
  public Writer duplicate() {
return new SlottedMeasurementsWriter(this.overwrite);
  }
}




Re: Kafka Consumer fetch-size/rate and Producer queue timeout

2017-11-07 Thread ashish pok
Thanks Fabian.
I am seeing thia consistently and can definitely use some help. I have plenty 
of graphana views I can share if that helps :)

Sent from Yahoo Mail on Android 
 
  On Tue, Nov 7, 2017 at 3:54 AM, Fabian Hueske wrote:   Hi 
Ashish,
Gordon (in CC) might be able to help you.
Cheers, Fabian

2017-11-05 16:24 GMT+01:00 Ashish Pokharel :

All,

I am starting to notice a strange behavior in a particular streaming app. I 
initially thought it was a Producer issue as I was seeing timeout exceptions 
(records expiring in queue. I did try to modify request.timeout.ms, linger.ms 
etc to help with the issue if it were caused by a sudden burst of data or 
something along those lines. However, what it caused the app to increase back 
pressure and made the slower and slower until that timeout is reached. With 
lower timeouts, app would actually raise exception and recover faster. I can 
tell it is not related to connectivity as other apps are running just fine 
around the same time frame connected to same brokers (we have at least 10 
streaming apps connected to same list of brokers) from the same data nodes. We 
have enabled Graphite Reporter in all of our applications. After deep diving 
into some of consumer and producer stats, I noticed that consumer fetch-rate 
drops tremendously while fetch-size grows exponentially BEFORE the producer 
actually start to show higher response-time and lower rates. Eventually, I 
noticed connection resets start to occur and connection counts go up 
momentarily. After which, things get back to normal. Data producer rates remain 
constant around that timeframe - we have Logstash producer sending data over. 
We checked both Logstash and Kafka metrics and they seem to be showing same 
pattern (sort of sin wave) throughout.

It seems to point to Kafka issue (perhaps some tuning between Flink App and 
Kafka) but wanted to check with the experts before I start knocking down Kafka 
Admin’s doors. Are there anything else I can look into. There are quite a few 
default stats in Graphite but those were the ones that made most sense.

Thanks, Ashish

  


Re: Flink memory usage

2017-11-07 Thread Greg Hogan
I’ve used the following simple script to capture Flink metrics by running:
python -u ./statsd_server.py 9020 > statsd_server.log


>>> flink-conf.yaml
metrics.reporters: statsd_reporter
metrics.reporter.statsd_reporter.class: 
org.apache.flink.metrics.statsd.StatsDReporter
metrics.reporter.statsd_reporter.host: 
metrics.reporter.statsd_reporter.port: 9020


>>> statsd_server.py
#!/usr/bin/env python

import socket
import sys
import time

if len(sys.argv) < 2:
  print('Usage {} '.format(sys.argv[0]))
  sys.exit(-1)

UDP_IP = ''
UDP_PORT = int(sys.argv[1])

sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.bind((UDP_IP, UDP_PORT))

while True:
data, addr = sock.recvfrom(4096)
print('{:.6f} {}'.format(time.time(), data))


> On Nov 5, 2017, at 4:40 AM, Jürgen Thomann  
> wrote:
> 
> Can you use wget (curl will work as well)? You can find the taskmanagers with 
> wget -O - http://localhost:8081/taskmanagers
> and wget -O - http://localhost:8081/taskmanagers/ 
> to see detailed jvm
> memory stats. localhost:8081 is in my example the jobmanager.
> 
> 
> On 04.11.2017 16:19, AndreaKinn wrote:
>> Anyway, If I understood how system metrics works (the results seems to be
>> showed in browser) I can't use it because my cluster is accessible only with
>> terminal via ssh
>> 
>> 
>> 
>> --
>> Sent from: 
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/



Re: Job Manager Configuration

2017-11-07 Thread Till Rohrmann
Hi Regina,

the user code is uploaded once to the `JobManager` and then downloaded from
each `TaskManager` once when it first receives the command to execute the
first task of your job.

As Chesnay said there is no fundamental limitation to the size of the Flink
job. However, it might be the case that you have configured your job
sub-optimally. You said that you have 300 parallel flows. Depending on
whether you've defined separate slot sharing groups for them or not, it
might be the case that parallel subtasks of all 300 parallel jobs share the
same slot (if you haven't changed the slot sharing group). Depending on
what you calculate, this can be inefficient because the individual tasks
don't get much computation time. Moreover, all tasks will allocate some
objects on the heap which can lead to more GC. Therefore, it might make
sense to group some of the jobs together and run these jobs in batches
after the previous batch completed. But this is hard to say without knowing
the details of your job and getting a glimpse at the JobManager logs.

Concerning the exception you're seeing, it would also be helpful to see the
logs of the client and the JobManager. Actually, the scheduling of the job
is independent of the response. Only the creation of the ExecutionGraph and
making the JobGraph highly available in case of an HA setup are executed
before the JobManager acknowledges the job submission. Only if this
acknowledge message is not received in time on the client side, then the
SubmissionTimeoutException is thrown. Therefore, I assume that somehow the
JobManager is too busy or kept from sending the acknowledge message.

Cheers,
Till



On Thu, Nov 2, 2017 at 7:18 PM, Chan, Regina  wrote:

> Does it copy per TaskManager or per operator? I only gave it 10
> TaskManagers with 2 slots. I’m perfectly fine with it queuing up and
> running when it has the resources to.
>
>
>
>
>
>
>
> *From:* Chesnay Schepler [mailto:ches...@apache.org]
> *Sent:* Wednesday, November 01, 2017 7:09 AM
> *To:* user@flink.apache.org
> *Subject:* Re: Job Manager Configuration
>
>
>
> AFAIK there is no theoretical limit on the size of the plan, it just
> depends on the available resources.
>
>
> The job submissions times out since it takes too long to deploy all the
> operators that the job defines. With 300 flows, each with 6 operators
> you're looking at potentially (1800 * parallelism) tasks that have to be
> deployed. For each task Flink copies the user-code of *all* flows to the
> executing TaskManager, which the network may just not be handle in time.
>
> I suggest to split your job into smaller batches or even run each of them
> independently.
>
> On 31.10.2017 16:25, Chan, Regina wrote:
>
> Asking an additional question, what is the largest plan that the
> JobManager can handle? Is there a limit? My flows don’t need to run in
> parallel and can run independently. I wanted them to run in one single job
> because it’s part of one logical commit on my side.
>
>
>
> Thanks,
>
> Regina
>
>
>
> *From:* Chan, Regina [Tech]
> *Sent:* Monday, October 30, 2017 3:22 PM
> *To:* 'user@flink.apache.org'
> *Subject:* Job Manager Configuration
>
>
>
> Flink Users,
>
>
>
> I have about 300 parallel flows in one job each with 2 inputs, 3
> operators, and 1 sink which makes for a large job. I keep getting the below
> timeout exception but I’ve already set it to a 30 minute time out with a
> 6GB heap on the JobManager? Is there a heuristic to better configure the
> job manager?
>
>
>
> Caused by: 
> org.apache.flink.runtime.client.JobClientActorSubmissionTimeoutException:
> Job submission to the JobManager timed out. You may increase
> 'akka.client.timeout' in case the JobManager needs more time to configure
> and confirm the job submission.
>
>
>
> *Regina Chan*
>
> *Goldman Sachs* *–* Enterprise Platforms, Data Architecture
>
> *30 Hudson Street, 37th floor | Jersey City, NY 07302
> *
> (  (212) 902-5697
>
>
>
>
>


Re: FlinkCEP behaviour with time constraints not as expected

2017-11-07 Thread Dawid Wysakowicz
Hi Federico,

For your given input and pattern there should (and there are) only two
timeouted patterns:

5> Left(Map(start -> List(Event(100,2017-11-05T03:56:02
5> Left(Map(start -> List(Event(100,2017-11-05T06:00:02

It is because in your patterns say the next event after events with value
>=100 should not have value >= 100 . And within your timeout there is no
sequence of events where (>=100)+ (<100).

But I will try to explain how it works with the same input for Pattern:

Pattern[Event].begin("start").where(_.value >=100).oneOrMore
.notNext("end").where(_.value <100).within(Time.minutes(30))

Then we have matches:

5> Right(Map(start -> List(Event(100,2017-11-05T03:50:02
5> Right(Map(start -> List(Event(100,2017-11-05T03:50:02),
Event(100,2017-11-05T03:52:02
5> Right(Map(start -> List(Event(100,2017-11-05T03:52:02
5> Right(Map(start -> List(Event(100,2017-11-05T03:50:02),
Event(100,2017-11-05T03:52:02), Event(100,2017-11-05T03:54:02
5> Right(Map(start -> List(Event(100,2017-11-05T03:52:02),
Event(100,2017-11-05T03:54:02
5> Right(Map(start -> List(Event(100,2017-11-05T03:54:02

and timeouted partial matches:

5> Left(Map(start -> List(Event(100,2017-11-05T03:50:02),
Event(100,2017-11-05T03:52:02), Event(100,2017-11-05T03:54:02),
Event(100,2017-11-05T03:56:02
5> Left(Map(start -> List(Event(100,2017-11-05T03:52:02),
Event(100,2017-11-05T03:54:02), Event(100,2017-11-05T03:56:02
5> Left(Map(start -> List(Event(100,2017-11-05T03:54:02),
Event(100,2017-11-05T03:56:02
5> Left(Map(start -> List(Event(100,2017-11-05T03:56:02
5> Left(Map(start -> List(Event(100,2017-11-05T06:00:02

Right now (in flink 1.3.2) pattern can start on each event (in 1.4 you will
be able to specify AFTER_MATCH_SKIP strategy see:
https://issues.apache.org/jira/browse/FLINK-7169), therefore you see
matches starting at 2017-11-05T03:50:02, 2017-11-05T03:52:02,
2017-11-05T03:54:02.
Also right now the oneOrMore is not greedy (in 1.4 you will be able to
alter it see: https://issues.apache.org/jira/browse/FLINK-7147), therefore
you see matches like: List(Event(100,2017-11-05T03:50:02)) and
List(Event(100,2017-11-05T03:50:02),
Event(100,2017-11-05T03:52:02)) rather than only one of those.

The timeoute partial matches are returned because within the timeout there
was no event with value <100 (in fact there was no event at all to be
checked).

Hope this "study" helps you understand the behaviour. If you feel I missed
something, please provide some example I could reproduce.

Regards,
Dawid

2017-11-07 11:29 GMT+01:00 Ufuk Celebi :

> Hey Frederico,
>
> let me pull in Dawid (cc'd) who works on CEP. He can probably clarify
> the expected behaviour here.
>
> Best,
>
> Ufuk
>
>
> On Mon, Nov 6, 2017 at 12:06 PM, Federico D'Ambrosio
>  wrote:
> > Hi everyone,
> >
> > I wanted to ask if FlinkCEP in the following scenario is working as it
> > should, or I have misunderstood its functioning.
> >
> > I've got a keyedstream associated with the following pattern:
> >
> > Pattern[Event].begin("start").where(_.value >=100).oneOrMore
> > .notNext("end").where(_.value >=100).within(Time.minutes(30))
> >
> > Considering a single key in the stream, for simplicity, I've got the
> > following sequence of events (using EventTime on the "time" field of the
> > json event):
> >
> > {value: 100, time: "2017-11-05 03:50:02.000"}
> > {value: 100, time: "2017-11-05 03:52:02.000"}
> > {value: 100, time: "2017-11-05 03:54:02.000"}
> > {value: 100, time: "2017-11-05 03:56:02.000"} // end of events within
> the 30
> > minutes from the first event
> > {value: 100, time: "2017-11-05 06:00:02.000"}
> >
> > Now, when it comes to the select/flatselect function, I tried printing
> the
> > content of the pattern map and what I noticed is that, for example, the
> > first 2 events weren't considered in the same pattern as the map was like
> > the following:
> >
> > {start=[{value: 100, time: 2017-11-05 03:50:02.000}]}
> > {start=[{value: 100, time: 2017-11-05 03:52:02.000}]}
> >
> > Now, shouldn't they be in the same List, as they belong to the same
> > iterative pattern, defined with the oneOrMore clause?
> >
> > Thank you for your insight,
> > Federico D'Ambrosio
>


Re: ExecutionGraph not serializable

2017-11-07 Thread Till Rohrmann
Hi XiangWei,

it is actually not intended to get access to the ExecutionGraph, because it
is a runtime component which does not make much sense to exist outside of
the JobManager. The RequestJob message is only a hack to make the
ExecutionGraph accessible to another actor running in the same ActorSystem.
This is the case for the WebRuntimeMonitor handlers. With Flip-6, we will
make the ExecutionGraph indirectly accessible by returning
an ArchivedExecutionGraph.

Cheers,
Till

On Tue, Nov 7, 2017 at 2:47 PM, XiangWei Huang 
wrote:

> hi Till,
>
>Sorry,I've made a mistake,i used 
> *StandaloneClusterClient*#*getJobManagerGateway
> *to get  *ActorGateway *to communicate with *JobManager *instead of using
> *JobMasterGateway*.
> Below is the code i executed for getting ExecuteGraph of a Job.
>
>
> val flinkConfig = new Configuration()
> val flinkCli = new StandaloneClusterClient(flinkConfig)
> *val jobManagerGateWay = flinkCli.getJobManagerGateway*
> val jobs = jobManagerGateWay.ask(RequestRunningJobsStatus,new
> FiniteDuration(10,TimeUnit.SECONDS)).asInstanceOf[Future[
> RunningJobsStatus]]
> val jobsStatus = Await.result(jobs,new FiniteDuration(10,TimeUnit.
> SECONDS)).getStatusMessages().asScala.head
> val jobId = jobsStatus.getJobId
> val timeOut = new FiniteDuration(10,TimeUnit.SECONDS)
> *val future = jobManagerGateWay.ask(RequestJob(jobId),timeOut)*
> val result = Await.result(future,timeOut)
>
> JobManager threw NotSerializableException  when i executed this code. So i
> wonder how is this happened and is there another way to get a job's
> ExecutionGraph programmatically.
>
> Best,XiangWei
>
> 2017-11-07 17:16 GMT+08:00 Till Rohrmann :
>
>> Hi XiangWei,
>>
>> how do you use the JobMasterGateway with the actor message RequestJob?
>> The JobMasterGateway is a Java interface and does not represent an
>> ActorCell to which you can send actor messages. Instead you should call
>> JobMasterGateway#requestArchivedExecutionGraph.
>>
>> Cheers,
>> Till
>> ​
>>
>> On Tue, Nov 7, 2017 at 9:58 AM, Fabian Hueske  wrote:
>>
>>> Hi XiangWei,
>>>
>>> I don't think this is a public interface, but Till (in CC) might know
>>> better.
>>>
>>> Best,
>>> Fabian
>>>
>>> 2017-11-06 3:27 GMT+01:00 XiangWei Huang :
>>>
 Hi Flink users,
 Flink Jobmanager throw a NotSerializableException when i used
 JobMasterGateway to get ExecutionGraph of a specific job with
 message *RequestJob(jobID). *Blow is the detail of Exception:


 [ERROR] [akka.remote.EndpointWriter] - Transient association error 
 (association remains live)java.io.NotSerializableException: 
 org.apache.flink.runtime.executiongraph.ExecutionGraph
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
at 
 java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at 
 java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at 
 java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
at 
 akka.serialization.JavaSerializer$$anonfun$toBinary$1.apply$mcV$sp(Serializer.scala:129)
at 
 akka.serialization.JavaSerializer$$anonfun$toBinary$1.apply(Serializer.scala:129)
at 
 akka.serialization.JavaSerializer$$anonfun$toBinary$1.apply(Serializer.scala:129)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
at akka.serialization.JavaSerializer.toBinary(Serializer.scala:129)
at akka.remote.MessageSerializer$.serialize(MessageSerializer.scala:36)
at 
 akka.remote.EndpointWriter$$anonfun$serializeMessage$1.apply(Endpoint.scala:875)
at 
 akka.remote.EndpointWriter$$anonfun$serializeMessage$1.apply(Endpoint.scala:875)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
at akka.remote.EndpointWriter.serializeMessage(Endpoint.scala:874)
at akka.remote.EndpointWriter.writeSend(Endpoint.scala:769)
at akka.remote.EndpointWriter$$anonfun$4.applyOrElse(Endpoint.scala:744)
at akka.actor.Actor$class.aroundReceive(Actor.scala:467)
at akka.remote.EndpointActor.aroundReceive(Endpoint.scala:437)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
at akka.actor.ActorCell.invoke(ActorCell.scala:487)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
at akka.dispatch.Mailbox.run(Mailbox.scala:220)
at 
 akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at 
 scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at 
 scala.concurrent.forkjoin.ForkJoinPool.runWorker(For

Call for responses: Apache Flink user survey 2017

2017-11-07 Thread Till Rohrmann
Hi everyone,

data Artisans is running a second annual Apache Flink user survey [1] in
order to understand Flink usage and the needs of the community. This survey
will help to shape the Flink roadmap and make Flink the best that it can be
for users.

We'll publish a report with a summary of findings at the conclusion of the
survey. All of your responses will remain confidential, and only aggregate
statistics will be shared.

We expect the survey to take 5-10 minutes, and all questions are
optional--we appreciate any feedback that you're willing to provide. The
survey will be open for responses until Monday, November 27.

As a thank you, respondents will be entered in a drawing to win one of 10
tickets to Flink Forward 2018 (your choice of the second-annual San
Francisco event on April 9-10 or the Berlin event on September 3-5).

We look forward to hearing from you.

Cheers,
Till

[1]
http://www.surveygizmo.com/s3/3166399/Apache-Flink-User-Survey-2ecff2d56551


Getting java.lang.ClassNotFoundException: for protobuf generated class

2017-11-07 Thread Shankara
Hi,

I am using flink 2.1.0 version and protobuf-java 2.6.1 version.
I am getting below exception for protobuf generated class. I have included
jar which is having that class. 

Can you please help me to check it. 

org.apache.beam.sdk.util.UserCodeException:
org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException:
Could not forward element to next operator
at
org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:36)
at
org.apache.beam.sdk.io.kafka.KafkaIO$TypedWithoutMetadata$1$DoFnInvoker.invokeProcessElement(Unknown
Source)
at
org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:177)
at
org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:141)
at
org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.processElement(DoFnRunnerWithMetricsUpdate.java:65)
at
org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.processElement(DoFnOperator.java:368)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:528)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:503)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:483)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:891)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:869)
at
org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
at
org.apache.beam.runners.flink.FlinkStreamingTransformTranslators$StripIdsMap.flatMap(FlinkStreamingTransformTranslators.java:213)
at
org.apache.beam.runners.flink.FlinkStreamingTransformTranslators$StripIdsMap.flatMap(FlinkStreamingTransformTranslators.java:207)
at
org.apache.flink.streaming.api.operators.StreamFlatMap.processElement(StreamFlatMap.java:50)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:528)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:503)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:483)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:891)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:869)
at
org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollectWithTimestamp(StreamSourceContexts.java:309)
at
org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collectWithTimestamp(StreamSourceContexts.java:408)
at
org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSourceWrapper.emitElement(UnboundedSourceWrapper.java:329)
at
org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSourceWrapper.run(UnboundedSourceWrapper.java:267)
at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:87)
at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:55)
at
org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:95)
at
org.apache.flink.streaming.runtime.tasks.StoppableSourceStreamTask.run(StoppableSourceStreamTask.java:39)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
at java.lang.Thread.run(Thread.java:748)
Caused by:
org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException:
Could not forward element to next operator
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:530)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:503)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:483)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:891)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:869)
at
org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator$MultiOutputOutputManagerFactory$1.output(DoFnOperator.java:730)
at
org.apache.beam.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:

Re: Flink memory leak

2017-11-07 Thread Aljoscha Krettek
I agree with Ufuk, it would be helpful to know what stateful operations are in 
the jobs (including windowing).

> On 7. Nov 2017, at 14:53, Ufuk Celebi  wrote:
> 
> Do you use any windowing? If yes, could you please share that code? If
> there is no stateful operation at all, it's strange where the list
> state instances are coming from.
> 
> On Tue, Nov 7, 2017 at 2:35 PM, ebru  wrote:
>> Hi Ufuk,
>> 
>> We don’t explicitly define any state descriptor. We only use map and filters
>> operator. We thought that gc handle clearing the flink’s internal states.
>> So how can we manage the memory if it is always increasing?
>> 
>> - Ebru
>> 
>> On 7 Nov 2017, at 16:23, Ufuk Celebi  wrote:
>> 
>> Hey Ebru, the memory usage might be increasing as long as a job is running.
>> This is expected (also in the case of multiple running jobs). The
>> screenshots are not helpful in that regard. :-(
>> 
>> What kind of stateful operations are you using? Depending on your use case,
>> you have to manually call `clear()` on the state instance in order to
>> release the managed state.
>> 
>> Best,
>> 
>> Ufuk
>> 
>> On Tue, Nov 7, 2017 at 12:43 PM, ebru  wrote:
>>> 
>>> 
>>> 
>>> Begin forwarded message:
>>> 
>>> From: ebru 
>>> Subject: Re: Flink memory leak
>>> Date: 7 November 2017 at 14:09:17 GMT+3
>>> To: Ufuk Celebi 
>>> 
>>> Hi Ufuk,
>>> 
>>> There are there snapshots of htop output.
>>> 1. snapshot is initial state.
>>> 2. snapshot is after submitted one job.
>>> 3. Snapshot is the output of the one job with 15000 EPS. And the memory
>>> usage is always increasing over time.
>>> 
>>> 
>>> 
>>> 
>>> <1.png><2.png><3.png>
>>> 
>>> On 7 Nov 2017, at 13:34, Ufuk Celebi  wrote:
>>> 
>>> Hey Ebru,
>>> 
>>> let me pull in Aljoscha (CC'd) who might have an idea what's causing this.
>>> 
>>> Since multiple jobs are running, it will be hard to understand to
>>> which job the state descriptors from the heap snapshot belong to.
>>> - Is it possible to isolate the problem and reproduce the behaviour
>>> with only a single job?
>>> 
>>> – Ufuk
>>> 
>>> 
>>> On Tue, Nov 7, 2017 at 10:27 AM, ÇETİNKAYA EBRU ÇETİNKAYA EBRU
>>>  wrote:
>>> 
>>> Hi,
>>> 
>>> We are using Flink 1.3.1 in production, we have one job manager and 3 task
>>> managers in standalone mode. Recently, we've noticed that we have memory
>>> related problems. We use docker container to serve Flink cluster. We have
>>> 300 slots and 20 jobs are running with parallelism of 10. Also the job
>>> count
>>> may be change over time. Taskmanager memory usage always increases. After
>>> job cancelation this memory usage doesn't decrease. We've tried to
>>> investigate the problem and we've got the task manager jvm heap snapshot.
>>> According to the jam heap analysis, possible memory leak was Flink list
>>> state descriptor. But we are not sure that is the cause of our memory
>>> problem. How can we solve the problem?
>>> 
>>> 
>>> 
>> 
>> 



Re: Flink memory leak

2017-11-07 Thread Ufuk Celebi
Do you use any windowing? If yes, could you please share that code? If
there is no stateful operation at all, it's strange where the list
state instances are coming from.

On Tue, Nov 7, 2017 at 2:35 PM, ebru  wrote:
> Hi Ufuk,
>
> We don’t explicitly define any state descriptor. We only use map and filters
> operator. We thought that gc handle clearing the flink’s internal states.
> So how can we manage the memory if it is always increasing?
>
> - Ebru
>
> On 7 Nov 2017, at 16:23, Ufuk Celebi  wrote:
>
> Hey Ebru, the memory usage might be increasing as long as a job is running.
> This is expected (also in the case of multiple running jobs). The
> screenshots are not helpful in that regard. :-(
>
> What kind of stateful operations are you using? Depending on your use case,
> you have to manually call `clear()` on the state instance in order to
> release the managed state.
>
> Best,
>
> Ufuk
>
> On Tue, Nov 7, 2017 at 12:43 PM, ebru  wrote:
>>
>>
>>
>> Begin forwarded message:
>>
>> From: ebru 
>> Subject: Re: Flink memory leak
>> Date: 7 November 2017 at 14:09:17 GMT+3
>> To: Ufuk Celebi 
>>
>> Hi Ufuk,
>>
>> There are there snapshots of htop output.
>> 1. snapshot is initial state.
>> 2. snapshot is after submitted one job.
>> 3. Snapshot is the output of the one job with 15000 EPS. And the memory
>> usage is always increasing over time.
>>
>>
>>
>>
>> <1.png><2.png><3.png>
>>
>> On 7 Nov 2017, at 13:34, Ufuk Celebi  wrote:
>>
>> Hey Ebru,
>>
>> let me pull in Aljoscha (CC'd) who might have an idea what's causing this.
>>
>> Since multiple jobs are running, it will be hard to understand to
>> which job the state descriptors from the heap snapshot belong to.
>> - Is it possible to isolate the problem and reproduce the behaviour
>> with only a single job?
>>
>> – Ufuk
>>
>>
>> On Tue, Nov 7, 2017 at 10:27 AM, ÇETİNKAYA EBRU ÇETİNKAYA EBRU
>>  wrote:
>>
>> Hi,
>>
>> We are using Flink 1.3.1 in production, we have one job manager and 3 task
>> managers in standalone mode. Recently, we've noticed that we have memory
>> related problems. We use docker container to serve Flink cluster. We have
>> 300 slots and 20 jobs are running with parallelism of 10. Also the job
>> count
>> may be change over time. Taskmanager memory usage always increases. After
>> job cancelation this memory usage doesn't decrease. We've tried to
>> investigate the problem and we've got the task manager jvm heap snapshot.
>> According to the jam heap analysis, possible memory leak was Flink list
>> state descriptor. But we are not sure that is the cause of our memory
>> problem. How can we solve the problem?
>>
>>
>>
>
>


Re: ExecutionGraph not serializable

2017-11-07 Thread XiangWei Huang
hi Till,

   Sorry,I've made a mistake,i used
*StandaloneClusterClient*#*getJobManagerGateway
*to get  *ActorGateway *to communicate with *JobManager *instead of using
*JobMasterGateway*.
Below is the code i executed for getting ExecuteGraph of a Job.


val flinkConfig = new Configuration()
val flinkCli = new StandaloneClusterClient(flinkConfig)
*val jobManagerGateWay = flinkCli.getJobManagerGateway*
val jobs = jobManagerGateWay.ask(RequestRunningJobsStatus,new
FiniteDuration(10,TimeUnit.SECONDS)).asInstanceOf[Future[RunningJobsStatus]]
val jobsStatus = Await.result(jobs,new
FiniteDuration(10,TimeUnit.SECONDS)).getStatusMessages().asScala.head
val jobId = jobsStatus.getJobId
val timeOut = new FiniteDuration(10,TimeUnit.SECONDS)
*val future = jobManagerGateWay.ask(RequestJob(jobId),timeOut)*
val result = Await.result(future,timeOut)

JobManager threw NotSerializableException  when i executed this code. So i
wonder how is this happened and is there another way to get a job's
ExecutionGraph programmatically.

Best,XiangWei

2017-11-07 17:16 GMT+08:00 Till Rohrmann :

> Hi XiangWei,
>
> how do you use the JobMasterGateway with the actor message RequestJob?
> The JobMasterGateway is a Java interface and does not represent an
> ActorCell to which you can send actor messages. Instead you should call
> JobMasterGateway#requestArchivedExecutionGraph.
>
> Cheers,
> Till
> ​
>
> On Tue, Nov 7, 2017 at 9:58 AM, Fabian Hueske  wrote:
>
>> Hi XiangWei,
>>
>> I don't think this is a public interface, but Till (in CC) might know
>> better.
>>
>> Best,
>> Fabian
>>
>> 2017-11-06 3:27 GMT+01:00 XiangWei Huang :
>>
>>> Hi Flink users,
>>> Flink Jobmanager throw a NotSerializableException when i used
>>> JobMasterGateway to get ExecutionGraph of a specific job with
>>> message *RequestJob(jobID). *Blow is the detail of Exception:
>>>
>>>
>>> [ERROR] [akka.remote.EndpointWriter] - Transient association error 
>>> (association remains live)java.io.NotSerializableException: 
>>> org.apache.flink.runtime.executiongraph.ExecutionGraph
>>> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
>>> at 
>>> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
>>> at 
>>> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
>>> at 
>>> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
>>> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
>>> at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
>>> at 
>>> akka.serialization.JavaSerializer$$anonfun$toBinary$1.apply$mcV$sp(Serializer.scala:129)
>>> at 
>>> akka.serialization.JavaSerializer$$anonfun$toBinary$1.apply(Serializer.scala:129)
>>> at 
>>> akka.serialization.JavaSerializer$$anonfun$toBinary$1.apply(Serializer.scala:129)
>>> at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
>>> at akka.serialization.JavaSerializer.toBinary(Serializer.scala:129)
>>> at akka.remote.MessageSerializer$.serialize(MessageSerializer.scala:36)
>>> at 
>>> akka.remote.EndpointWriter$$anonfun$serializeMessage$1.apply(Endpoint.scala:875)
>>> at 
>>> akka.remote.EndpointWriter$$anonfun$serializeMessage$1.apply(Endpoint.scala:875)
>>> at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
>>> at akka.remote.EndpointWriter.serializeMessage(Endpoint.scala:874)
>>> at akka.remote.EndpointWriter.writeSend(Endpoint.scala:769)
>>> at akka.remote.EndpointWriter$$anonfun$4.applyOrElse(Endpoint.scala:744)
>>> at akka.actor.Actor$class.aroundReceive(Actor.scala:467)
>>> at akka.remote.EndpointActor.aroundReceive(Endpoint.scala:437)
>>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
>>> at akka.actor.ActorCell.invoke(ActorCell.scala:487)
>>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
>>> at akka.dispatch.Mailbox.run(Mailbox.scala:220)
>>> at 
>>> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
>>> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>>> at 
>>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>>> at 
>>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>>> at 
>>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>>>
>>>
>>> So,is it a bug or the way to get job’s executionGraph is invalid.
>>>
>>>
>>> Best,XiangWei
>>>
>>>
>>>
>>
>


Re: Flink memory leak

2017-11-07 Thread ebru
Hi Ufuk,

We don’t explicitly define any state descriptor. We only use map and filters 
operator. We thought that gc handle clearing the flink’s internal states. 
So how can we manage the memory if it is always increasing?

- Ebru
> On 7 Nov 2017, at 16:23, Ufuk Celebi  wrote:
> 
> Hey Ebru, the memory usage might be increasing as long as a job is running. 
> This is expected (also in the case of multiple running jobs). The screenshots 
> are not helpful in that regard. :-(
> 
> What kind of stateful operations are you using? Depending on your use case, 
> you have to manually call `clear()` on the state instance in order to release 
> the managed state.
> 
> Best,
> 
> Ufuk
> 
> On Tue, Nov 7, 2017 at 12:43 PM, ebru  > wrote:
> 
> 
>> Begin forwarded message:
>> 
>> From: ebru > >
>> Subject: Re: Flink memory leak
>> Date: 7 November 2017 at 14:09:17 GMT+3
>> To: Ufuk Celebi mailto:u...@apache.org>>
>> 
>> Hi Ufuk,
>> 
>> There are there snapshots of htop output.
>> 1. snapshot is initial state.
>> 2. snapshot is after submitted one job.
>> 3. Snapshot is the output of the one job with 15000 EPS. And the memory 
>> usage is always increasing over time.
>> 
>> 
>> 
>> 
>> <1.png><2.png><3.png>
>>> On 7 Nov 2017, at 13:34, Ufuk Celebi >> > wrote:
>>> 
>>> Hey Ebru,
>>> 
>>> let me pull in Aljoscha (CC'd) who might have an idea what's causing this.
>>> 
>>> Since multiple jobs are running, it will be hard to understand to
>>> which job the state descriptors from the heap snapshot belong to.
>>> - Is it possible to isolate the problem and reproduce the behaviour
>>> with only a single job?
>>> 
>>> – Ufuk
>>> 
>>> 
>>> On Tue, Nov 7, 2017 at 10:27 AM, ÇETİNKAYA EBRU ÇETİNKAYA EBRU
>>> mailto:b20926...@cs.hacettepe.edu.tr>> 
>>> wrote:
 Hi,
 
 We are using Flink 1.3.1 in production, we have one job manager and 3 task
 managers in standalone mode. Recently, we've noticed that we have memory
 related problems. We use docker container to serve Flink cluster. We have
 300 slots and 20 jobs are running with parallelism of 10. Also the job 
 count
 may be change over time. Taskmanager memory usage always increases. After
 job cancelation this memory usage doesn't decrease. We've tried to
 investigate the problem and we've got the task manager jvm heap snapshot.
 According to the jam heap analysis, possible memory leak was Flink list
 state descriptor. But we are not sure that is the cause of our memory
 problem. How can we solve the problem?
>> 
> 
> 



Re: MODERATE for d...@flink.apache.org

2017-11-07 Thread Chesnay Schepler

Have you tried removing the "cygdrive" portion from the path?

Something along the lines of

“state.backend.fs.checkpointdir: file:///Y:/flink-checkpoint-dir
state.checkpoints.dir: Y:/flink-checkpoints
state.backend: filesystem
high-availability.storageDir: file:///Y:/flink-recovery
"



On 07.11.2017 12:36, Jordan Kuan wrote:

Dear Ufuk,

Sorry, I still can’t get that work, I have double checked all the 
nodes can access the mapped drive Y.
Here is my state backend configuration, and it writes files to local C 
drive.


“state.backend.fs.checkpointdir: file:///cygdrive/Y/flink-checkpoint-dir
state.checkpoints.dir: /cygdrive/Y/flink-checkpoints
state.backend: filesystem
high-availability.storageDir: file:///cygdrive/Y/flink-recovery
"

I have attached the flink-conf.yaml and screenshots, Is here any wrong 
setting?


Thanks,
Jordan

On 7 Nov 2017, at 6:49 PM, Ufuk Celebi > wrote:


Hey Jordan,

yeah, that should just work. Check out the state backend configuration
here: 
https://ci.apache.org/projects/flink/flink-docs-release-1.4/ops/state/state_backends.html 



– Ufuk

On Tue, Nov 7, 2017 at 11:44 AM, Jordan Kuan > wrote:

Dear Ufuk,

Thank you for your reply.

All the cluster Flink servers are able to access network drive, and 
it mapped as drive Y in all nodes.

Do I need to provide more information?

Thanks,
Jordan


On 7 Nov 2017, at 6:36 PM, Ufuk Celebi > wrote:


As answered by David on SO, the files need to be accessible by all
nodes. In your setup this seems not to be the case, therefore it won't
work. You need a distributed file system (e.g. NFS or HDFS) or object
store (e.g. S3) that is accessible from all nodes.

– Ufuk


On Tue, Nov 7, 2017 at 3:34 AM, Jordan Kuan > wrote:

Dear Flink Dev Team

I have encountered a problem and can't find any solution in Google.
And I have created a thread in stackoverflow.com 
 but no response.
https://stackoverflow.com/questions/47123371/flink-windows-ha 



I would really appreciate it if you could give some suggestions to me.

Thanks,

Jordan

On Tue, Nov 7, 2017 at 1:53 AM, Robert Metzger 
mailto:rmetz...@apache.org>> wrote:


Hi,
I would suggest to send your question to the 
user@flink.apache.org  list

(make sure to subscribe first)

-- Forwarded message --
From: 
>

Date: Mon, Nov 6, 2017 at 5:23 PM
Subject: MODERATE for d...@flink.apache.org 


To:
Cc:
dev-allow-tc.1509985381.dcccgaimcaiiefbkiapi-jordan.kuan=gmail@flink.apache.org 





To approve:
dev-accept-1509985381.14933.cigohlkhhjaclimeg...@flink.apache.org 


To reject:
dev-reject-1509985381.14933.cigohlkhhjaclimeg...@flink.apache.org 


To give a reason to reject:
%%% Start comment
%%% End comment



-- Forwarded message --
From: Jordan Kuan >

To: d...@flink.apache.org 
Cc:
Bcc:
Date: Tue, 7 Nov 2017 00:22:53 +0800
Subject: Flink Windows HA Issue.
Dear Flink Dev Team

I have encountered a problem and can't find any solution in Google.
And I have created a thread in stackoverflow.com 
 but no response.
https://stackoverflow.com/questions/47123371/flink-windows-ha 



I would really appreciate it if you could give some suggestions 
to me.


Thanks,

Jordan







--
Best Regards,
Jordan Kuan








Re: MODERATE for d...@flink.apache.org

2017-11-07 Thread Jordan Kuan
Dear Ufuk,

Sorry, I still can’t get that work, I have double checked all the nodes can
access the mapped drive Y.
Here is my state backend configuration, and it writes files to local C
drive.

“state.backend.fs.checkpointdir: file:///cygdrive/Y/flink-checkpoint-dir
state.checkpoints.dir: /cygdrive/Y/flink-checkpoints
state.backend: filesystem
high-availability.storageDir: file:///cygdrive/Y/flink-recovery
"

I have attached the flink-conf.yaml and screenshots, Is here any wrong
setting?

Thanks,
Jordan

On 7 Nov 2017, at 6:49 PM, Ufuk Celebi  wrote:

Hey Jordan,

yeah, that should just work. Check out the state backend configuration
here: https://ci.apache.org/projects/flink/flink-docs-release-1.4/
ops/state/state_backends.html

– Ufuk

On Tue, Nov 7, 2017 at 11:44 AM, Jordan Kuan  wrote:

Dear Ufuk,

Thank you for your reply.

All the cluster Flink servers are able to access network drive, and it
mapped as drive Y in all nodes.
Do I need to provide more information?

Thanks,
Jordan


On 7 Nov 2017, at 6:36 PM, Ufuk Celebi  wrote:

As answered by David on SO, the files need to be accessible by all
nodes. In your setup this seems not to be the case, therefore it won't
work. You need a distributed file system (e.g. NFS or HDFS) or object
store (e.g. S3) that is accessible from all nodes.

– Ufuk


On Tue, Nov 7, 2017 at 3:34 AM, Jordan Kuan  wrote:

Dear Flink Dev Team

I have encountered a problem and can't find any solution in Google.
And I have created a thread in stackoverflow.com but no response.
https://stackoverflow.com/questions/47123371/flink-windows-ha

I would really appreciate it if you could give some suggestions to me.

Thanks,

Jordan

On Tue, Nov 7, 2017 at 1:53 AM, Robert Metzger  wrote:


Hi,
I would suggest to send your question to the user@flink.apache.org list
(make sure to subscribe first)

-- Forwarded message --
From: 
Date: Mon, Nov 6, 2017 at 5:23 PM
Subject: MODERATE for d...@flink.apache.org
To:
Cc:
dev-allow-tc.1509985381.dcccgaimcaiiefbkiapi-jordan.kuan=gma
il@flink.apache.org



To approve:
 dev-accept-1509985381.14933.cigohlkhhjaclimeg...@flink.apache.org
To reject:
 dev-reject-1509985381.14933.cigohlkhhjaclimeg...@flink.apache.org
To give a reason to reject:
%%% Start comment
%%% End comment



-- Forwarded message --
From: Jordan Kuan 
To: d...@flink.apache.org
Cc:
Bcc:
Date: Tue, 7 Nov 2017 00:22:53 +0800
Subject: Flink Windows HA Issue.
Dear Flink Dev Team

I have encountered a problem and can't find any solution in Google.
And I have created a thread in stackoverflow.com but no response.
https://stackoverflow.com/questions/47123371/flink-windows-ha

I would really appreciate it if you could give some suggestions to me.

Thanks,

Jordan






--
Best Regards,
Jordan Kuan


flink-conf.yaml
Description: Binary data


Re: Flink send checkpointing message in IT

2017-11-07 Thread Chesnay Schepler
hmm. While there is /technically/ no guarantee that 
notifyCheckpointComplete is called, it virtually always is,

especially in local setups.

Is it possible for you to share more code (or all of it)? (you can also 
send it to me directly)


On 07.11.2017 11:58, Rinat wrote:
Yes, but *notifyCheckpointComplete *callback doesn’t called on await 
completion, I do the same, as in specified test template :


ActorGateway jobManager = (ActorGateway) 
Await.result(cluster.leaderGateway().future(), DEADLINE.timeLeft());
Future savepointResultFuture = jobManager.ask(new 
JobManagerMessages.TriggerSavepoint(

jobId, Option.empty()), DEADLINE.timeLeft()
);
while(!savepointResultFuture.isCompleted()) {
System.out.println();
}
Object savepointResult = Await.result(savepointResultFuture, 
DEADLINE.timeLeft());


if (savepointResult instanceof 
JobManagerMessages.TriggerSavepointFailure) {
throw new RuntimeException(String.format("Something went 
wrong while executing savepoint, [message=%s]",

((JobManagerMessages.TriggerSavepointFailure) savepointResult).cause()
));
}

Thx

On 7 Nov 2017, at 13:54, Chesnay Schepler > wrote:


Do you verify that savepointResult is a 
JobManagerMessages.TriggerSavepointSuccess? It could also be 
JobManagerMessages.TriggerSavepointFailure. (instanceof check)


On 02.11.2017 19:11, Rinat wrote:
Chesnay, thanks for your reply, it was very helpful, but I took 
logic from this test template and tried to reuse it in my IT case, 
but found one more issue.
I’ve registered an accumulator in my source function, and for it’s 
value, as specified in the specified example.
When accumulator has an expected value, I perform a savepoint and 
wait for it’s completion using the further code


ActorGateway jobManager = (ActorGateway) 
Await.result(cluster.leaderGateway().future(),DEADLINE.timeLeft());
Future savepointResultFuture = jobManager.ask(new 
JobManagerMessages.TriggerSavepoint(
 jobId, Option.empty()),DEADLINE.timeLeft()
);
Object savepointResult = 
Await.result(savepointResultFuture,DEADLINE.timeLeft());
Afterwards, if failures haven’t been detected I cancels my job and 
shutdowns cluster.


I found, that checkpoint method *notifyCheckpointComplete* not 
always called, before the *savepointResult* is ready. So the part of 
my logic, that lives in implementation of this method doesn’t work 
and test fails.


So could you or someone explain, does *Flink* guaranties, that 
*notifyCheckpointComplete* method will be called before 
*savepointResult * will become accessable.
For me, it’s rather strange behaviour and I think that I’m doing 
something wrong.


Thx.

On 1 Nov 2017, at 14:26, Chesnay Schepler > wrote:


You could trigger a savepoint, which from the viewpoint of 
sources/operators/sinks is the same thing as a checkpoint.


How to do this depends a bit on how your test case is written, but 
you can take a look at the 
SavepointMigrationTestBase#executeAndSavepoint which is all about 
running josb and triggering

savepoints once certain conditions have been met.

On 30.10.2017 16:01, Rinat wrote:

Hi guys, I’ve got a question about working with checkpointing.
I would like to implement IT test, where source is a fixed 
collection of items and sink performs additional logic, when 
checkpointing is completed.


I would like to force executing checkpointing, when all messages 
from my test source were sent and processed by sink.

Please tell me, whether such logic could be performed or not, and how.

Thx !













Re: Flink send checkpointing message in IT

2017-11-07 Thread Rinat
Yes, but  notifyCheckpointComplete callback doesn’t called on await completion, 
I do the same, as in specified test template :

ActorGateway jobManager = (ActorGateway) 
Await.result(cluster.leaderGateway().future(), DEADLINE.timeLeft());
Future savepointResultFuture = jobManager.ask(new 
JobManagerMessages.TriggerSavepoint(
jobId, Option.empty()), DEADLINE.timeLeft()
);
while(!savepointResultFuture.isCompleted()) {
System.out.println();
}
Object savepointResult = Await.result(savepointResultFuture, 
DEADLINE.timeLeft());

if (savepointResult instanceof 
JobManagerMessages.TriggerSavepointFailure) {
throw new RuntimeException(String.format("Something went wrong 
while executing savepoint, [message=%s]",
((JobManagerMessages.TriggerSavepointFailure) 
savepointResult).cause()
));
}

Thx

> On 7 Nov 2017, at 13:54, Chesnay Schepler  wrote:
> 
> Do you verify that savepointResult is a 
> JobManagerMessages.TriggerSavepointSuccess? It could also be 
> JobManagerMessages.TriggerSavepointFailure. (instanceof check)
> 
> On 02.11.2017 19:11, Rinat wrote:
>> Chesnay, thanks for your reply, it was very helpful, but I took logic from 
>> this test template and tried to reuse it in my IT case, but found one more 
>> issue.
>> I’ve registered an accumulator in my source function, and for it’s value, as 
>> specified in the specified example.
>> When accumulator has an expected value, I perform a savepoint and wait for 
>> it’s completion using the further code
>> 
>> ActorGateway jobManager = (ActorGateway) 
>> Await.result(cluster.leaderGateway().future(), DEADLINE.timeLeft());
>> Future savepointResultFuture = jobManager.ask(new 
>> JobManagerMessages.TriggerSavepoint(
>> jobId, Option.empty()), DEADLINE.timeLeft()
>> );
>> Object savepointResult = Await.result(savepointResultFuture, 
>> DEADLINE.timeLeft());
>> Afterwards, if failures haven’t been detected I cancels my job and shutdowns 
>> cluster.
>> 
>> I found, that checkpoint method notifyCheckpointComplete not always called, 
>> before the savepointResult is ready. So the part of my logic, that lives in 
>> implementation of this method doesn’t work and test fails.
>> 
>> So could you or someone explain, does Flink guaranties, that 
>> notifyCheckpointComplete method will be called before savepointResult  will 
>> become accessable.
>> For me, it’s rather strange behaviour and I think that I’m doing something 
>> wrong.
>> 
>> Thx.
>> 
>>> On 1 Nov 2017, at 14:26, Chesnay Schepler >> > wrote:
>>> 
>>> You could trigger a savepoint, which from the viewpoint of 
>>> sources/operators/sinks is the same thing as a checkpoint.
>>> 
>>> How to do this depends a bit on how your test case is written, but you can 
>>> take a look at the SavepointMigrationTestBase#executeAndSavepoint which is 
>>> all about running josb and triggering
>>> savepoints once certain conditions have been met.
>>> 
>>> On 30.10.2017 16:01, Rinat wrote:
 Hi guys, I’ve got a question about working with checkpointing.
 I would like to implement IT test, where source is a fixed collection of 
 items and sink performs additional logic, when checkpointing is completed.
 
 I would like to force executing checkpointing, when all messages from my 
 test source were sent and processed by sink.
 Please tell me, whether such logic could be performed or not, and how.
 
 Thx !
>>> 
>>> 
>> 
> 



Re: Flink send checkpointing message in IT

2017-11-07 Thread Chesnay Schepler
Do you verify that savepointResult is a 
JobManagerMessages.TriggerSavepointSuccess? It could also be 
JobManagerMessages.TriggerSavepointFailure. (instanceof check)


On 02.11.2017 19:11, Rinat wrote:
Chesnay, thanks for your reply, it was very helpful, but I took logic 
from this test template and tried to reuse it in my IT case, but found 
one more issue.
I’ve registered an accumulator in my source function, and for it’s 
value, as specified in the specified example.
When accumulator has an expected value, I perform a savepoint and wait 
for it’s completion using the further code


ActorGateway jobManager = (ActorGateway) 
Await.result(cluster.leaderGateway().future(),DEADLINE.timeLeft());
Future savepointResultFuture = jobManager.ask(new 
JobManagerMessages.TriggerSavepoint(
 jobId, Option.empty()),DEADLINE.timeLeft()
);
Object savepointResult = 
Await.result(savepointResultFuture,DEADLINE.timeLeft());
Afterwards, if failures haven’t been detected I cancels my job and 
shutdowns cluster.


I found, that checkpoint method *notifyCheckpointComplete* not always 
called, before the *savepointResult* is ready. So the part of my 
logic, that lives in implementation of this method doesn’t work and 
test fails.


So could you or someone explain, does *Flink* guaranties, that 
*notifyCheckpointComplete* method will be called before 
*savepointResult * will become accessable.
For me, it’s rather strange behaviour and I think that I’m doing 
something wrong.


Thx.

On 1 Nov 2017, at 14:26, Chesnay Schepler > wrote:


You could trigger a savepoint, which from the viewpoint of 
sources/operators/sinks is the same thing as a checkpoint.


How to do this depends a bit on how your test case is written, but 
you can take a look at the 
SavepointMigrationTestBase#executeAndSavepoint which is all about 
running josb and triggering

savepoints once certain conditions have been met.

On 30.10.2017 16:01, Rinat wrote:

Hi guys, I’ve got a question about working with checkpointing.
I would like to implement IT test, where source is a fixed 
collection of items and sink performs additional logic, when 
checkpointing is completed.


I would like to force executing checkpointing, when all messages 
from my test source were sent and processed by sink.

Please tell me, whether such logic could be performed or not, and how.

Thx !









Re: MODERATE for d...@flink.apache.org

2017-11-07 Thread Ufuk Celebi
Hey Jordan,

yeah, that should just work. Check out the state backend configuration
here: 
https://ci.apache.org/projects/flink/flink-docs-release-1.4/ops/state/state_backends.html

– Ufuk

On Tue, Nov 7, 2017 at 11:44 AM, Jordan Kuan  wrote:
> Dear Ufuk,
>
> Thank you for your reply.
>
> All the cluster Flink servers are able to access network drive, and it mapped 
> as drive Y in all nodes.
> Do I need to provide more information?
>
> Thanks,
> Jordan
>
>
>> On 7 Nov 2017, at 6:36 PM, Ufuk Celebi  wrote:
>>
>> As answered by David on SO, the files need to be accessible by all
>> nodes. In your setup this seems not to be the case, therefore it won't
>> work. You need a distributed file system (e.g. NFS or HDFS) or object
>> store (e.g. S3) that is accessible from all nodes.
>>
>> – Ufuk
>>
>>
>> On Tue, Nov 7, 2017 at 3:34 AM, Jordan Kuan  wrote:
>>> Dear Flink Dev Team
>>>
>>> I have encountered a problem and can't find any solution in Google.
>>> And I have created a thread in stackoverflow.com but no response.
>>> https://stackoverflow.com/questions/47123371/flink-windows-ha
>>>
>>> I would really appreciate it if you could give some suggestions to me.
>>>
>>> Thanks,
>>>
>>> Jordan
>>>
>>> On Tue, Nov 7, 2017 at 1:53 AM, Robert Metzger  wrote:

 Hi,
 I would suggest to send your question to the user@flink.apache.org list
 (make sure to subscribe first)

 -- Forwarded message --
 From: 
 Date: Mon, Nov 6, 2017 at 5:23 PM
 Subject: MODERATE for d...@flink.apache.org
 To:
 Cc:
 dev-allow-tc.1509985381.dcccgaimcaiiefbkiapi-jordan.kuan=gmail@flink.apache.org



 To approve:
   dev-accept-1509985381.14933.cigohlkhhjaclimeg...@flink.apache.org
 To reject:
   dev-reject-1509985381.14933.cigohlkhhjaclimeg...@flink.apache.org
 To give a reason to reject:
 %%% Start comment
 %%% End comment



 -- Forwarded message --
 From: Jordan Kuan 
 To: d...@flink.apache.org
 Cc:
 Bcc:
 Date: Tue, 7 Nov 2017 00:22:53 +0800
 Subject: Flink Windows HA Issue.
 Dear Flink Dev Team

 I have encountered a problem and can't find any solution in Google.
 And I have created a thread in stackoverflow.com but no response.
 https://stackoverflow.com/questions/47123371/flink-windows-ha

 I would really appreciate it if you could give some suggestions to me.

 Thanks,

 Jordan



>>>
>>>
>>>
>>> --
>>> Best Regards,
>>> Jordan Kuan
>


Re: MODERATE for d...@flink.apache.org

2017-11-07 Thread Jordan Kuan
Dear Ufuk,

Thank you for your reply.

All the cluster Flink servers are able to access network drive, and it mapped 
as drive Y in all nodes.
Do I need to provide more information?

Thanks,
Jordan


> On 7 Nov 2017, at 6:36 PM, Ufuk Celebi  wrote:
> 
> As answered by David on SO, the files need to be accessible by all
> nodes. In your setup this seems not to be the case, therefore it won't
> work. You need a distributed file system (e.g. NFS or HDFS) or object
> store (e.g. S3) that is accessible from all nodes.
> 
> – Ufuk
> 
> 
> On Tue, Nov 7, 2017 at 3:34 AM, Jordan Kuan  wrote:
>> Dear Flink Dev Team
>> 
>> I have encountered a problem and can't find any solution in Google.
>> And I have created a thread in stackoverflow.com but no response.
>> https://stackoverflow.com/questions/47123371/flink-windows-ha
>> 
>> I would really appreciate it if you could give some suggestions to me.
>> 
>> Thanks,
>> 
>> Jordan
>> 
>> On Tue, Nov 7, 2017 at 1:53 AM, Robert Metzger  wrote:
>>> 
>>> Hi,
>>> I would suggest to send your question to the user@flink.apache.org list
>>> (make sure to subscribe first)
>>> 
>>> -- Forwarded message --
>>> From: 
>>> Date: Mon, Nov 6, 2017 at 5:23 PM
>>> Subject: MODERATE for d...@flink.apache.org
>>> To:
>>> Cc:
>>> dev-allow-tc.1509985381.dcccgaimcaiiefbkiapi-jordan.kuan=gmail@flink.apache.org
>>> 
>>> 
>>> 
>>> To approve:
>>>   dev-accept-1509985381.14933.cigohlkhhjaclimeg...@flink.apache.org
>>> To reject:
>>>   dev-reject-1509985381.14933.cigohlkhhjaclimeg...@flink.apache.org
>>> To give a reason to reject:
>>> %%% Start comment
>>> %%% End comment
>>> 
>>> 
>>> 
>>> -- Forwarded message --
>>> From: Jordan Kuan 
>>> To: d...@flink.apache.org
>>> Cc:
>>> Bcc:
>>> Date: Tue, 7 Nov 2017 00:22:53 +0800
>>> Subject: Flink Windows HA Issue.
>>> Dear Flink Dev Team
>>> 
>>> I have encountered a problem and can't find any solution in Google.
>>> And I have created a thread in stackoverflow.com but no response.
>>> https://stackoverflow.com/questions/47123371/flink-windows-ha
>>> 
>>> I would really appreciate it if you could give some suggestions to me.
>>> 
>>> Thanks,
>>> 
>>> Jordan
>>> 
>>> 
>>> 
>> 
>> 
>> 
>> --
>> Best Regards,
>> Jordan Kuan



Re: MODERATE for d...@flink.apache.org

2017-11-07 Thread Ufuk Celebi
As answered by David on SO, the files need to be accessible by all
nodes. In your setup this seems not to be the case, therefore it won't
work. You need a distributed file system (e.g. NFS or HDFS) or object
store (e.g. S3) that is accessible from all nodes.

– Ufuk


On Tue, Nov 7, 2017 at 3:34 AM, Jordan Kuan  wrote:
> Dear Flink Dev Team
>
> I have encountered a problem and can't find any solution in Google.
> And I have created a thread in stackoverflow.com but no response.
> https://stackoverflow.com/questions/47123371/flink-windows-ha
>
> I would really appreciate it if you could give some suggestions to me.
>
> Thanks,
>
> Jordan
>
> On Tue, Nov 7, 2017 at 1:53 AM, Robert Metzger  wrote:
>>
>> Hi,
>> I would suggest to send your question to the user@flink.apache.org list
>> (make sure to subscribe first)
>>
>> -- Forwarded message --
>> From: 
>> Date: Mon, Nov 6, 2017 at 5:23 PM
>> Subject: MODERATE for d...@flink.apache.org
>> To:
>> Cc:
>> dev-allow-tc.1509985381.dcccgaimcaiiefbkiapi-jordan.kuan=gmail@flink.apache.org
>>
>>
>>
>> To approve:
>>dev-accept-1509985381.14933.cigohlkhhjaclimeg...@flink.apache.org
>> To reject:
>>dev-reject-1509985381.14933.cigohlkhhjaclimeg...@flink.apache.org
>> To give a reason to reject:
>> %%% Start comment
>> %%% End comment
>>
>>
>>
>> -- Forwarded message --
>> From: Jordan Kuan 
>> To: d...@flink.apache.org
>> Cc:
>> Bcc:
>> Date: Tue, 7 Nov 2017 00:22:53 +0800
>> Subject: Flink Windows HA Issue.
>> Dear Flink Dev Team
>>
>> I have encountered a problem and can't find any solution in Google.
>> And I have created a thread in stackoverflow.com but no response.
>> https://stackoverflow.com/questions/47123371/flink-windows-ha
>>
>> I would really appreciate it if you could give some suggestions to me.
>>
>> Thanks,
>>
>> Jordan
>>
>>
>>
>
>
>
> --
> Best Regards,
> Jordan Kuan


Re: Flink memory leak

2017-11-07 Thread Ufuk Celebi
Hey Ebru,

let me pull in Aljoscha (CC'd) who might have an idea what's causing this.

Since multiple jobs are running, it will be hard to understand to
which job the state descriptors from the heap snapshot belong to.
- Is it possible to isolate the problem and reproduce the behaviour
with only a single job?

– Ufuk


On Tue, Nov 7, 2017 at 10:27 AM, ÇETİNKAYA EBRU ÇETİNKAYA EBRU
 wrote:
> Hi,
>
> We are using Flink 1.3.1 in production, we have one job manager and 3 task
> managers in standalone mode. Recently, we've noticed that we have memory
> related problems. We use docker container to serve Flink cluster. We have
> 300 slots and 20 jobs are running with parallelism of 10. Also the job count
> may be change over time. Taskmanager memory usage always increases. After
> job cancelation this memory usage doesn't decrease. We've tried to
> investigate the problem and we've got the task manager jvm heap snapshot.
> According to the jam heap analysis, possible memory leak was Flink list
> state descriptor. But we are not sure that is the cause of our memory
> problem. How can we solve the problem?


Re: FlinkCEP behaviour with time constraints not as expected

2017-11-07 Thread Ufuk Celebi
Hey Frederico,

let me pull in Dawid (cc'd) who works on CEP. He can probably clarify
the expected behaviour here.

Best,

Ufuk


On Mon, Nov 6, 2017 at 12:06 PM, Federico D'Ambrosio
 wrote:
> Hi everyone,
>
> I wanted to ask if FlinkCEP in the following scenario is working as it
> should, or I have misunderstood its functioning.
>
> I've got a keyedstream associated with the following pattern:
>
> Pattern[Event].begin("start").where(_.value >=100).oneOrMore
> .notNext("end").where(_.value >=100).within(Time.minutes(30))
>
> Considering a single key in the stream, for simplicity, I've got the
> following sequence of events (using EventTime on the "time" field of the
> json event):
>
> {value: 100, time: "2017-11-05 03:50:02.000"}
> {value: 100, time: "2017-11-05 03:52:02.000"}
> {value: 100, time: "2017-11-05 03:54:02.000"}
> {value: 100, time: "2017-11-05 03:56:02.000"} // end of events within the 30
> minutes from the first event
> {value: 100, time: "2017-11-05 06:00:02.000"}
>
> Now, when it comes to the select/flatselect function, I tried printing the
> content of the pattern map and what I noticed is that, for example, the
> first 2 events weren't considered in the same pattern as the map was like
> the following:
>
> {start=[{value: 100, time: 2017-11-05 03:50:02.000}]}
> {start=[{value: 100, time: 2017-11-05 03:52:02.000}]}
>
> Now, shouldn't they be in the same List, as they belong to the same
> iterative pattern, defined with the oneOrMore clause?
>
> Thank you for your insight,
> Federico D'Ambrosio


Flink memory leak

2017-11-07 Thread ÇETİNKAYA EBRU ÇETİNKAYA EBRU

Hi,

We are using Flink 1.3.1 in production, we have one job manager and 3 
task managers in standalone mode. Recently, we've noticed that we have 
memory related problems. We use docker container to serve Flink cluster. 
We have 300 slots and 20 jobs are running with parallelism of 10. Also 
the job count may be change over time. Taskmanager memory usage always 
increases. After job cancelation this memory usage doesn't decrease. 
We've tried to investigate the problem and we've got the task manager 
jvm heap snapshot. According to the jam heap analysis, possible memory 
leak was Flink list state descriptor. But we are not sure that is the 
cause of our memory problem. How can we solve the problem?


Re: ExecutionGraph not serializable

2017-11-07 Thread Till Rohrmann
Hi XiangWei,

how do you use the JobMasterGateway with the actor message RequestJob? The
JobMasterGateway is a Java interface and does not represent an ActorCell to
which you can send actor messages. Instead you should call
JobMasterGateway#requestArchivedExecutionGraph.

Cheers,
Till
​

On Tue, Nov 7, 2017 at 9:58 AM, Fabian Hueske  wrote:

> Hi XiangWei,
>
> I don't think this is a public interface, but Till (in CC) might know
> better.
>
> Best,
> Fabian
>
> 2017-11-06 3:27 GMT+01:00 XiangWei Huang :
>
>> Hi Flink users,
>> Flink Jobmanager throw a NotSerializableException when i used
>> JobMasterGateway to get ExecutionGraph of a specific job with
>> message *RequestJob(jobID). *Blow is the detail of Exception:
>>
>>
>> [ERROR] [akka.remote.EndpointWriter] - Transient association error 
>> (association remains live)java.io.NotSerializableException: 
>> org.apache.flink.runtime.executiongraph.ExecutionGraph
>>  at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
>>  at 
>> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
>>  at 
>> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
>>  at 
>> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
>>  at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
>>  at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
>>  at 
>> akka.serialization.JavaSerializer$$anonfun$toBinary$1.apply$mcV$sp(Serializer.scala:129)
>>  at 
>> akka.serialization.JavaSerializer$$anonfun$toBinary$1.apply(Serializer.scala:129)
>>  at 
>> akka.serialization.JavaSerializer$$anonfun$toBinary$1.apply(Serializer.scala:129)
>>  at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
>>  at akka.serialization.JavaSerializer.toBinary(Serializer.scala:129)
>>  at akka.remote.MessageSerializer$.serialize(MessageSerializer.scala:36)
>>  at 
>> akka.remote.EndpointWriter$$anonfun$serializeMessage$1.apply(Endpoint.scala:875)
>>  at 
>> akka.remote.EndpointWriter$$anonfun$serializeMessage$1.apply(Endpoint.scala:875)
>>  at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
>>  at akka.remote.EndpointWriter.serializeMessage(Endpoint.scala:874)
>>  at akka.remote.EndpointWriter.writeSend(Endpoint.scala:769)
>>  at akka.remote.EndpointWriter$$anonfun$4.applyOrElse(Endpoint.scala:744)
>>  at akka.actor.Actor$class.aroundReceive(Actor.scala:467)
>>  at akka.remote.EndpointActor.aroundReceive(Endpoint.scala:437)
>>  at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
>>  at akka.actor.ActorCell.invoke(ActorCell.scala:487)
>>  at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
>>  at akka.dispatch.Mailbox.run(Mailbox.scala:220)
>>  at 
>> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
>>  at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>>  at 
>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>>  at 
>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>>  at 
>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>>
>>
>> So,is it a bug or the way to get job’s executionGraph is invalid.
>>
>>
>> Best,XiangWei
>>
>>
>>
>


Re: ExecutionGraph not serializable

2017-11-07 Thread Fabian Hueske
Hi XiangWei,

I don't think this is a public interface, but Till (in CC) might know
better.

Best,
Fabian

2017-11-06 3:27 GMT+01:00 XiangWei Huang :

> Hi Flink users,
> Flink Jobmanager throw a NotSerializableException when i used
> JobMasterGateway to get ExecutionGraph of a specific job with
> message *RequestJob(jobID). *Blow is the detail of Exception:
>
>
> [ERROR] [akka.remote.EndpointWriter] - Transient association error 
> (association remains live)java.io.NotSerializableException: 
> org.apache.flink.runtime.executiongraph.ExecutionGraph
>   at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
>   at 
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
>   at 
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
>   at 
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
>   at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
>   at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
>   at 
> akka.serialization.JavaSerializer$$anonfun$toBinary$1.apply$mcV$sp(Serializer.scala:129)
>   at 
> akka.serialization.JavaSerializer$$anonfun$toBinary$1.apply(Serializer.scala:129)
>   at 
> akka.serialization.JavaSerializer$$anonfun$toBinary$1.apply(Serializer.scala:129)
>   at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
>   at akka.serialization.JavaSerializer.toBinary(Serializer.scala:129)
>   at akka.remote.MessageSerializer$.serialize(MessageSerializer.scala:36)
>   at 
> akka.remote.EndpointWriter$$anonfun$serializeMessage$1.apply(Endpoint.scala:875)
>   at 
> akka.remote.EndpointWriter$$anonfun$serializeMessage$1.apply(Endpoint.scala:875)
>   at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
>   at akka.remote.EndpointWriter.serializeMessage(Endpoint.scala:874)
>   at akka.remote.EndpointWriter.writeSend(Endpoint.scala:769)
>   at akka.remote.EndpointWriter$$anonfun$4.applyOrElse(Endpoint.scala:744)
>   at akka.actor.Actor$class.aroundReceive(Actor.scala:467)
>   at akka.remote.EndpointActor.aroundReceive(Endpoint.scala:437)
>   at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
>   at akka.actor.ActorCell.invoke(ActorCell.scala:487)
>   at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
>   at akka.dispatch.Mailbox.run(Mailbox.scala:220)
>   at 
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
>   at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>   at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>
>
> So,is it a bug or the way to get job’s executionGraph is invalid.
>
>
> Best,XiangWei
>
>
>


Re: Kafka Consumer fetch-size/rate and Producer queue timeout

2017-11-07 Thread Fabian Hueske
Hi Ashish,

Gordon (in CC) might be able to help you.

Cheers, Fabian

2017-11-05 16:24 GMT+01:00 Ashish Pokharel :

> All,
>
> I am starting to notice a strange behavior in a particular streaming app.
> I initially thought it was a Producer issue as I was seeing timeout
> exceptions (records expiring in queue. I did try to modify
> request.timeout.ms, linger.ms etc to help with the issue if it were
> caused by a sudden burst of data or something along those lines. However,
> what it caused the app to increase back pressure and made the slower and
> slower until that timeout is reached. With lower timeouts, app would
> actually raise exception and recover faster. I can tell it is not related
> to connectivity as other apps are running just fine around the same time
> frame connected to same brokers (we have at least 10 streaming apps
> connected to same list of brokers) from the same data nodes. We have
> enabled Graphite Reporter in all of our applications. After deep diving
> into some of consumer and producer stats, I noticed that consumer
> fetch-rate drops tremendously while fetch-size grows exponentially BEFORE
> the producer actually start to show higher response-time and lower rates.
> Eventually, I noticed connection resets start to occur and connection
> counts go up momentarily. After which, things get back to normal. Data
> producer rates remain constant around that timeframe - we have Logstash
> producer sending data over. We checked both Logstash and Kafka metrics and
> they seem to be showing same pattern (sort of sin wave) throughout.
>
> It seems to point to Kafka issue (perhaps some tuning between Flink App
> and Kafka) but wanted to check with the experts before I start knocking
> down Kafka Admin’s doors. Are there anything else I can look into. There
> are quite a few default stats in Graphite but those were the ones that made
> most sense.
>
> Thanks, Ashish