RE: Streaming : a way to "key by partition id" without redispatching data

2017-11-13 Thread Gwenhael Pasquiers
>From what I understood, in your case you might solve your issue by using 
>specific key classes instead of Strings.

Maybe you could create key classes that have a user-specified hashcode that 
could take the previous key's hashcode as a value. That way your data shouldn't 
be sent over the wire and stay in the same partition thus on the same 
taskmanager..


Re: Metric Registry Warnings

2017-11-13 Thread Fabian Hueske
Hi Ashish,

this is a known issue and has been fixed for the next version [1].

Best, Fabian

[1] https://issues.apache.org/jira/browse/FLINK-7100

2017-11-11 16:02 GMT+01:00 Ashish Pokharel :

> All,
>
> Hopefully this is a quick one. I enabled Graphite reporter in my App and I
> started to see the following warning messages all over the place:
>
> 2017-11-07 20:54:09,288 WARN  org.apache.flink.runtime.
> metrics.MetricRegistry   - Error while registering metric.
> java.lang.IllegalArgumentException: A metric named
> flink.taskmanager.pndapoc-cdh-dn-14.8823e32fae717d08e211fceec56479b7.normalizeData.parseRawStats
> -> Filter.numBytesOut already exists
>
> I saw some threads about this regarding JMX as well but I don’t think I
> see a resolution for it.
>
> One thing I made sure was I haven’t reused name (like parseRawStats) in
> my App. Also, this seems to happen for every metric, not only for a select
> few where I could have mistakenly set the same name.
>
> Appreciate any help on this.
>
> Thanks, Ashish
>


Re: readFile, DataStream

2017-11-13 Thread Kostas Kloudas
Hi Juan,

The problem is that once a file for a certain timestamp is processed and the 
global modification timestamp is modified, 
then all files for that timestamp are considered processed.

The solution is not to remove the = from the modificationTime <= 
globalModificationTime; in ContinuousFileMonitoringFunction, as this 
would lead to duplicates. 
The solution, in my opinion is to keep a list of the filenames (or hashes) of 
the files processed for the last globalModTimestamp (and only for that 
timestamp) and when there are new with the same timestamp, then check if the 
name of the file they belong is in that list. 

This way you pay a bit of memory but you get what you want.

What do you think?

Thanks,
Kostas

> On Nov 10, 2017, at 12:54 PM, Juan Miguel Cejuela  wrote:
> 
> Hi there,
> 
> I’m trying to watch a directory for new incoming files (with 
> StreamExecutionEnvironment#readFile) with a subsecond latency (interval watch 
> of ~100ms, and using the flag FileProcessingMode.PROCESS_CONTINUOUSLY).
> 
> If many files come in within (under) the interval watching time, flink 
> doesn’t seem to get notice of the files, and as a result, the files do not 
> get processed. The behavior also seems undeterministic, as it likely depends 
> on timeouts and so on. For example, 10 new files come in immediately (that 
> is, essentially in parallel) and perhaps 3 files get processed, but the rest 
> 7 don’t.
> 
> I’ve extended and created my own FileInputFormat, for which I don’t do much 
> more than in the open function, log when a new file comes in. That’s how I 
> know that many fails get lost.
> 
> On the other hand, when I restart flink, all the files in the directory are 
> immediately processed. This is the expected behavior and works fine.
> 
> The situation of unprocessed files is a bummer.
> 
> Am I doing something wrong? Do I need to set something in the configuration? 
> Is it a bug in Flink?
> 
> Hopefully I described my problem clearly.
> 
> Thank you.
> 



Re: Flink drops messages?

2017-11-13 Thread Fabian Hueske
Hi Andrea,

you are right. Flink's window operators can drop messages which are too
late, i.e., have a timestamp smaller than the last watermark.
This is expected behavior and documented at several places [1] [2].

There are a couple of options how to deal with late elements:

1. Use more conservative watermarks. This will add latency to your program
2. Configure an allowedLateness parameter for windows but have to be able
to handle respective updates. [2]
3. Use side outputs on windows (will become available with Flink 1.4) [3]

Cheers, Fabian

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/event_time.html#late-elements
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/windows.html#allowed-lateness
[3]
https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/operators/windows.html#getting-late-data-as-a-side-output

2017-11-12 21:29 GMT+01:00 AndreaKinn :

> Hi,
> I'm running a Flink application where data are retrieved from a Kafka
> broker
> and forwarded to a Cassandra sink.
> I've implemented the following watermark emitter:
>
> public class CustomTimestampExtractor implements
> AssignerWithPeriodicWatermarks String, Double, Double, Double>>{
>
> private final long maxOutOfOrderness = 800;
> private long currentMaxTimestamp;
>
> @Override
> public long extractTimestamp(Tuple8 String,
> Double, Double, Double> element, long previousElementTimestamp) {
> long timestamp = element.f2.getTime();
> currentMaxTimestamp = Math.max(timestamp,
> currentMaxTimestamp);
> return timestamp;
> }
>
> @Override
> public Watermark getCurrentWatermark() {
> return new Watermark(currentMaxTimestamp -
> maxOutOfOrderness);
> }
> }
>
> While I have implemented a record reordering in windows on event time
> basis:
>
> ...
> .window(TumblingEventTimeWindows.of(Time.milliseconds(WINDOW_SIZE)))
> .apply(new WindowFunction Harness.KafkaRecord,
> String, TimeWindow>() {
>
> public void apply(String key,
> TimeWindow window,
>
> Iterable input,
>
> Collector out)
>
> throws Exception {
>
>
> ArrayList list = new
> ArrayList();
>
> for (Harness.KafkaRecord
> in: input)
> list.add(in);
> Collections.sort(list);
> for(Harness.KafkaRecord
> output: list)
>
> out.collect(output);
> }
> });
>
> Unfortunately when I check Cassandra's destination table size I note that
> some messages are lost.
>
> Performing 3 tests I have ingested data at 50, 25 and 15 Hz. I expected to
> see lower loss percentage with the lower ingestion frequency, instead it is
> the opposite!!
>
> P.S.: Kafka ingests 45.000 messages of 1Kb each one, following the loss
> percentage:
>
> 50 Hz: 0.273%
> 25 Hz: 0.284%
> 15 Hz: 0.302%
>
> My suspect is that the data are lost because they arrive with a too high
> lateness and they are dropped by Flink. Is it a possibility?
>
>
>
>
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.
> n4.nabble.com/
>


Re: Getting java.lang.ClassNotFoundException: for protobuf generated class

2017-11-13 Thread Nico Kruber
Hi Shankara,
can you give us some more details, e.g.
- how do you run the job?
- how do you add/include the jar with the missing class?
- is that jar file part of your program's jar or separate?
- is the missing class, i.e. "com.huawei.ccn.intelliom.ims.MeasurementTable
$measurementTable" (an inner class starting in lower-case?), really in the jar 
file? It might be a wrongly generated protobuf class ...


Nico

On Tuesday, 7 November 2017 15:34:35 CET Shankara wrote:
> Hi,
> 
> I am using flink 2.1.0 version and protobuf-java 2.6.1 version.
> I am getting below exception for protobuf generated class. I have included
> jar which is having that class.
> 
> Can you please help me to check it.
> 
> org.apache.beam.sdk.util.UserCodeException:
> org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException
> : Could not forward element to next operator
>   at
> org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:36)
>   at
> org.apache.beam.sdk.io.kafka.KafkaIO$TypedWithoutMetadata$1$DoFnInvoker.invo
> keProcessElement(Unknown Source)
>   at
> org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoF
> nRunner.java:177) at
> org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunne
> r.java:141) at
> org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.processEle
> ment(DoFnRunnerWithMetricsUpdate.java:65) at
> org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.pr
> ocessElement(DoFnOperator.java:368) at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput
> .pushToOperator(OperatorChain.java:528) at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput
> .collect(OperatorChain.java:503) at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput
> .collect(OperatorChain.java:483) at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutp
> ut.collect(AbstractStreamOperator.java:891) at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutp
> ut.collect(AbstractStreamOperator.java:869) at
> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(Timest
> ampedCollector.java:51) at
> org.apache.beam.runners.flink.FlinkStreamingTransformTranslators$StripIdsMap
> .flatMap(FlinkStreamingTransformTranslators.java:213) at
> org.apache.beam.runners.flink.FlinkStreamingTransformTranslators$StripIdsMap
> .flatMap(FlinkStreamingTransformTranslators.java:207) at
> org.apache.flink.streaming.api.operators.StreamFlatMap.processElement(Stream
> FlatMap.java:50) at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput
> .pushToOperator(OperatorChain.java:528) at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput
> .collect(OperatorChain.java:503) at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput
> .collect(OperatorChain.java:483) at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutp
> ut.collect(AbstractStreamOperator.java:891) at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutp
> ut.collect(AbstractStreamOperator.java:869) at
> org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermar
> kContext.processAndCollectWithTimestamp(StreamSourceContexts.java:309) at
> org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkConte
> xt.collectWithTimestamp(StreamSourceContexts.java:408) at
> org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSou
> rceWrapper.emitElement(UnboundedSourceWrapper.java:329) at
> org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSou
> rceWrapper.run(UnboundedSourceWrapper.java:267) at
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:
> 87) at
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:
> 55) at
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTa
> sk.java:95) at
> org.apache.flink.streaming.runtime.tasks.StoppableSourceStreamTask.run(Stopp
> ableSourceStreamTask.java:39) at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:2
> 63) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
>   at java.lang.Thread.run(Thread.java:748)
> Caused by:
> org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException
> : Could not forward element to next operator
>   at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput
> .pushToOperator(OperatorChain.java:530) at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput
> .collect(OperatorChain.java:503) at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput
> .collect(OperatorChain.java:483) at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutp
> 

Flink vs Spark streaming benchmark

2017-11-13 Thread G.S.Vijay Raajaa
Hi Guys,

I have been using Flink for quite sometime now and recently I hit upon a
benchmark result that was published in Data bricks.

Would love to hear your thoughts -
https://databricks.com/blog/2017/10/11/benchmarking-structured-streaming-on-databricks-runtime-against-state-of-the-art-streaming-systems.html

Regards,
Vijay Raajaa G S


Re: AvroParquetWriter may cause task managers to get lost

2017-11-13 Thread Fabian Hueske
Hi Ivan,

I don't have much experience with Avro, but extracting the schema and
creating a writer for each record sounds like a pretty expensive approach.
This might result in significant load and increased GC activity.

Do all records have a different schema or might it make sense to cache the
writers in a weak hashmap?

Best, Fabian


2017-11-07 19:51 GMT+01:00 Ivan Budincevic :

> Hi all,
>
>
>
> We recently implemented a feature in our streaming flink job in which we
> have a AvroParquetWriter which we build every time the overridden “write”
> method from org.apache.flink.streaming.connectors.fs.Writer gets called.
> We had to do this because the schema of each record is potentially
> different and we have to get the schema for the AvroParquetWriter out of
> the record itself first. Previously this builder was built only one time in
> the “open” method and from then only the write method was called per
> record.
>
>
>
> Since implementing this our job crashes with “Connection unexpectedly
> closed by remote task manager ‘internal company url’. This might indicate
> that the remote task manager was lost.”
>
>
>
> We did not run into any issues on our test environments, so we are
> suspecting this problem occurs only on higher loads as we have on our
> production environment. Unfortunately we still don’t have a proper means of
> reproducing this much load on our test environment to debug.
>
>
>
> Would having the AvroParquetWriter being built on every write be causing
> the problem and if so why would that be the case?
>
>
>
> Any help in getting to the bottom of the issue would be really
> appreciated. Bellow there is a code snippet of the class which uses the
> AvroParquetWriter.
>
>
>
> Best regards,
>
> Ivan Budincevic
>
> Software engineer, bol.com
>
> Netherlands
>
>
>
> package com.bol.measure.timeblocks.files;
>
> import com.bol.measure.timeblocks.measurement.SlottedMeasurements;
> import org.apache.avro.generic.GenericRecord;
> import org.apache.flink.streaming.connectors.fs.Writer;
> import org.apache.hadoop.fs.FileSystem;
> import org.apache.hadoop.fs.Path;
> import org.apache.parquet.avro.AvroParquetWriter;
> import org.apache.parquet.column.ParquetProperties;
> import org.apache.parquet.hadoop.ParquetFileWriter;
> import org.apache.parquet.hadoop.ParquetWriter;
> import org.apache.parquet.hadoop.metadata.CompressionCodecName;
>
> import java.io.IOException;
>
> public class SlottedMeasurementsWriter implements
> Writer {
>   private transient ParquetWriter parquetWriter;
>   private boolean overwrite;
>   private Path path;
>
>   public SlottedMeasurementsWriter(boolean overwrite) {
> this.overwrite = overwrite;
>   }
>
>   @Override
>   public void open(FileSystem fs, Path path) throws IOException {
> this.path = path;
>   }
>
>   @Override
>   public long flush() throws IOException {
> return parquetWriter.getDataSize();
>   }
>
>   @Override
>   public long getPos() throws IOException {
> return parquetWriter.getDataSize();
>   }
>
>   @Override
>   public void close() throws IOException {
> parquetWriter.close();
>   }
>
>   @Override
>   public void write(SlottedMeasurements slot) throws IOException {
>
> final AvroParquetWriter.Builder writerBuilder =
>   AvroParquetWriter
> .*builder*(path)
> .withSchema(slot.getMeasurements().get(0).getSchema())
> .withCompressionCodec(CompressionCodecName.*UNCOMPRESSED*)
> .withDictionaryEncoding(true)
> .withWriterVersion(ParquetProperties.WriterVersion.*PARQUET_1_0*);
> if (overwrite) {
>   writerBuilder.withWriteMode(ParquetFileWriter.Mode.*OVERWRITE*);
> }
>
> parquetWriter = writerBuilder.build();
>
> for (GenericRecord measurement : slot.getMeasurements()) {
>   parquetWriter.write(measurement);
> }
>   }
>
>
>   @Override
>   public Writer duplicate() {
> return new SlottedMeasurementsWriter(this.overwrite);
>   }
> }
>
>
>
>
>


Re: AvroParquetWriter may cause task managers to get lost

2017-11-13 Thread Nico Kruber
Hi Ivan,
sure, the more work you do per record, the slower the sink will be. However, 
this should not influence (much) the liveness checks inside flink.
Do you get some meaningful entries in the TaskManagers' logs indicating the 
problem?

I'm no expert on Avro and don't know how much actual work it is to create such 
a writer, but from the code you gave:
- wouldn't your getPos() circumvent the BucketingSink's rolling file property? 
- similarly for flush() which may be dangerous during recovery (judging from 
its documentation - "returns the offset that the file must be truncated to at 
recovery")?


Nico

On Tuesday, 7 November 2017 19:51:35 CET Ivan Budincevic wrote:
> Hi all,
> 
> We recently implemented a feature in our streaming flink job in which we
> have a AvroParquetWriter which we build every time the overridden “write”
> method from org.apache.flink.streaming.connectors.fs.Writer gets called. We
> had to do this because the schema of each record is potentially different
> and we have to get the schema for the AvroParquetWriter out of the record
> itself first. Previously this builder was built only one time in the “open”
> method and from then only the write method was called per record.
 
> Since implementing this our job crashes with “Connection unexpectedly closed
> by remote task manager ‘internal company url’. This might indicate that the
> remote task manager was lost.”
 
> We did not run into any issues on our test environments, so we are
> suspecting this problem occurs only on higher loads as we have on our
> production environment. Unfortunately we still don’t have a proper means of
> reproducing this much load on our test environment to debug.
 
> Would having the AvroParquetWriter being built on every write be causing the
> problem and if so why would that be the case?
 
> Any help in getting to the bottom of the issue would be really appreciated.
> Bellow there is a code snippet of the class which uses the
> AvroParquetWriter.
 
> Best regards,
> Ivan Budincevic
> Software engineer, bol.com
> Netherlands
> 
> package com.bol.measure.timeblocks.files;
> 
> import com.bol.measure.timeblocks.measurement.SlottedMeasurements;
> import org.apache.avro.generic.GenericRecord;
> import org.apache.flink.streaming.connectors.fs.Writer;
> import org.apache.hadoop.fs.FileSystem;
> import org.apache.hadoop.fs.Path;
> import org.apache.parquet.avro.AvroParquetWriter;
> import org.apache.parquet.column.ParquetProperties;
> import org.apache.parquet.hadoop.ParquetFileWriter;
> import org.apache.parquet.hadoop.ParquetWriter;
> import org.apache.parquet.hadoop.metadata.CompressionCodecName;
> 
> import java.io.IOException;
> 
> public class SlottedMeasurementsWriter implements
> Writer {
 private transient
> ParquetWriter parquetWriter;
>   private boolean overwrite;
>   private Path path;
> 
>   public SlottedMeasurementsWriter(boolean overwrite) {
> this.overwrite = overwrite;
>   }
> 
>   @Override
>   public void open(FileSystem fs, Path path) throws IOException {
> this.path = path;
>   }
> 
>   @Override
>   public long flush() throws IOException {
> return parquetWriter.getDataSize();
>   }
> 
>   @Override
>   public long getPos() throws IOException {
> return parquetWriter.getDataSize();
>   }
> 
>   @Override
>   public void close() throws IOException {
> parquetWriter.close();
>   }
> 
>   @Override
>   public void write(SlottedMeasurements slot) throws IOException {
> 
> final AvroParquetWriter.Builder writerBuilder =
>   AvroParquetWriter
> .builder(path)
> .withSchema(slot.getMeasurements().get(0).getSchema())
> .withCompressionCodec(CompressionCodecName.UNCOMPRESSED)
> .withDictionaryEncoding(true)
> .withWriterVersion(ParquetProperties.WriterVersion.PARQUET_1_0);
> if (overwrite) {
>   writerBuilder.withWriteMode(ParquetFileWriter.Mode.OVERWRITE);
> }
> 
> parquetWriter = writerBuilder.build();
> 
> for (GenericRecord measurement : slot.getMeasurements()) {
>   parquetWriter.write(measurement);
> }
>   }
> 
> 
>   @Override
>   public Writer duplicate() {
> return new SlottedMeasurementsWriter(this.overwrite);
>   }
> }
> 
> 



signature.asc
Description: This is a digitally signed message part.


Re: Flink HA Zookeeper Connection Timeout

2017-11-13 Thread Nico Kruber
Hi Sathya,
have you checked this yet?
https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/
jobmanager_high_availability.html

I'm no expert on the HA setup, have you also tried Flink 1.3 just in case?


Nico

On Wednesday, 8 November 2017 04:02:47 CET Sathya Hariesh Prakash (sathypra) 
wrote:
> Hi – We’re currently testing Flink HA and running into a zookeeper timeout
> issue. Error log below.
 
> Is there a production checklist or any information on parameters that are
> related to flink HA that I need to pay attention to?
 
> Any pointers would really help. Please let me know if any additional
> information is needed. Thanks!
 
> NOTE: I see multiple connection timeout messages. With different elapsed
> times.
 
> {
>"timeMillis":1510095254557,
>"thread":"Curator-Framework-0",
>"level":"ERROR",
>   
> "loggerName":"org.apache.flink.shaded.org.apache.curator.ConnectionState",
> "message":"Connection timed out for connection string
> (zookeeper.system.svc.cluster.local:2181) and timeout (15000) / elapsed
> (15004)", "thrown":{
>   "commonElementCount":0,
>   "localizedMessage":"KeeperErrorCode = ConnectionLoss",
>   "message":"KeeperErrorCode = ConnectionLoss",
>  
> "name":"org.apache.flink.shaded.org.apache.curator.CuratorConnectionLossExc
> eption",
 "extendedStackTrace":[
>  {
>
> "class":"org.apache.flink.shaded.org.apache.curator.ConnectionState",
> "method":"checkTimeouts",
> "file":"ConnectionState.java",
> "line":197,
> "exact":true,
> "location":"flink-runtime_2.10-1.2.jar",
> "version":"1.2"
>  },
>  {
>
> "class":"org.apache.flink.shaded.org.apache.curator.ConnectionState",
> "method":"getZooKeeper",
> "file":"ConnectionState.java",
> "line":87,
> "exact":true,
> "location":"flink-runtime_2.10-1.2.jar",
> "version":"1.2"
>  },
>  {
>
> "class":"org.apache.flink.shaded.org.apache.curator.CuratorZookeeperClient"
> ,
 "method":"getZooKeeper",
> "file":"CuratorZookeeperClient.java",
> "line":115,
> "exact":true,
> "location":"flink-runtime_2.10-1.2.jar",
> "version":"1.2"
>  },
>  {
>
> "class":"org.apache.flink.shaded.org.apache.curator.framework.imps.CuratorF
> rameworkImpl",
 "method":"performBackgroundOperation",
> "file":"CuratorFrameworkImpl.java",
> "line":806,
> "exact":true,
> "location":"flink-runtime_2.10-1.2.jar",
> "version":"1.2"
>  },
>  {
>
> "class":"org.apache.flink.shaded.org.apache.curator.framework.imps.CuratorF
> rameworkImpl",
 "method":"backgroundOperationsLoop",
> "file":"CuratorFrameworkImpl.java",
> "line":792,
> "exact":true,
> "location":"flink-runtime_2.10-1.2.jar",
> "version":"1.2"
>  },
>  {
>
> "class":"org.apache.flink.shaded.org.apache.curator.framework.imps.CuratorF
> rameworkImpl",
 "method":"access$300",
> "file":"CuratorFrameworkImpl.java",
> "line":62,
> "exact":true,
> "location":"flink-runtime_2.10-1.2.jar",
> "version":"1.2"
>  },
>  {
>
> "class":"org.apache.flink.shaded.org.apache.curator.framework.imps.CuratorF
> rameworkImpl$4",
 "method":"call",
> "file":"CuratorFrameworkImpl.java",
> "line":257,
> "exact":true,
> "location":"flink-runtime_2.10-1.2.jar",
> "version":"1.2"
>  },
>  {
> "class":"java.util.concurrent.FutureTask",
> "method":"run",
> "file":"FutureTask.java",
> "line":266,
> "exact":true,
> "location":"?",
> "version":"1.8.0_66"
>  },
>  {
> "class":"java.util.concurrent.ThreadPoolExecutor",
> "method":"runWorker",
> "file":"ThreadPoolExecutor.java",
> "line":1142,
> "exact":true,
> "location":"?",
> "version":"1.8.0_66"
>  },
>  {
> "class":"java.util.concurrent.ThreadPoolExecutor$Worker",
> "method":"run",
> "file":"ThreadPoolExecutor.java",
> "line":617,
> "exact":true,
> "location":"?",
> "version":"1.8.0_66"
>  },
>  {
> "class":"java.lang.Thread",
> "method":"run",
> "file":"Thread.java",
> "line":745,
> "exact":true,
> "location":"?",
> "version":"1.8.0_66"
>  }
>   ]
>},
>"endOfBatch":false,
>"loggerFqcn":"org.apache.logging.slf4j.Log4jLogger",
>"threadId":258,
>"threa

Re: Flink drops messages?

2017-11-13 Thread Kien Truong
Getting late elements from side-output is already available with Flink 
1.3 :)


Regards,

Kien

On 11/13/2017 5:00 PM, Fabian Hueske wrote:

Hi Andrea,

you are right. Flink's window operators can drop messages which are 
too late, i.e., have a timestamp smaller than the last watermark.

This is expected behavior and documented at several places [1] [2].

There are a couple of options how to deal with late elements:

1. Use more conservative watermarks. This will add latency to your program
2. Configure an allowedLateness parameter for windows but have to be 
able to handle respective updates. [2]

3. Use side outputs on windows (will become available with Flink 1.4) [3]

Cheers, Fabian

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/event_time.html#late-elements
[2] 
https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/windows.html#allowed-lateness
[3] 
https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/operators/windows.html#getting-late-data-as-a-side-output


2017-11-12 21:29 GMT+01:00 AndreaKinn >:


Hi,
I'm running a Flink application where data are retrieved from a
Kafka broker
and forwarded to a Cassandra sink.
I've implemented the following watermark emitter:

public class CustomTimestampExtractor implements
AssignerWithPeriodicWatermarks>{

    private final long maxOutOfOrderness = 800;
    private long currentMaxTimestamp;

        @Override
        public long extractTimestamp(Tuple8 element, long previousElementTimestamp) {
                long timestamp = element.f2.getTime();
                currentMaxTimestamp = Math.max(timestamp,
currentMaxTimestamp);
                return timestamp;
        }

        @Override
        public Watermark getCurrentWatermark() {
                return new Watermark(currentMaxTimestamp -
maxOutOfOrderness);
        }
}

While I have implemented a record reordering in windows on event
time basis:

...
.window(TumblingEventTimeWindows.of(Time.milliseconds(WINDOW_SIZE)))
                                .apply(new
WindowFunction() {

                                        public void apply(String key,
TimeWindow window,
Iterable input,
Collector out)
            throws Exception {

ArrayList list = new
ArrayList();

                                                for
(Harness.KafkaRecord in: input)
list.add(in);
Collections.sort(list);
for(Harness.KafkaRecord output: list)
out.collect(output);
                                        }
                                });

Unfortunately when I check Cassandra's destination table size I
note that
some messages are lost.

Performing 3 tests I have ingested data at 50, 25 and 15 Hz. I
expected to
see lower loss percentage with the lower ingestion frequency,
instead it is
the opposite!!

P.S.: Kafka ingests 45.000 messages of 1Kb each one, following the
loss
percentage:

50 Hz: 0.273%
25 Hz: 0.284%
15 Hz: 0.302%

My suspect is that the data are lost because they arrive with a
too high
lateness and they are dropped by Flink. Is it a possibility?




--
Sent from:
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/





Re: Metric Registry Warnings

2017-11-13 Thread ashish pok
Thanks Fabian!

Sent from Yahoo Mail on Android 
 
  On Mon, Nov 13, 2017 at 4:44 AM, Fabian Hueske wrote:   Hi 
Ashish,

this is a known issue and has been fixed for the next version [1].

Best, Fabian

[1] https://issues.apache.org/jira/browse/FLINK-7100

2017-11-11 16:02 GMT+01:00 Ashish Pokharel :

All,
Hopefully this is a quick one. I enabled Graphite reporter in my App and I 
started to see the following warning messages all over the place:
2017-11-07 20:54:09,288 WARN  org.apache.flink.runtime. metrics.MetricRegistry  
      - Error while registering metric.java.lang. 
IllegalArgumentException: A metric named flink.taskmanager.pndapoc-cdh- dn-14. 
8823e32fae717d08e211fceec56479 b7.normalizeData.parseRawStats -> 
Filter.numBytesOut already exists
I saw some threads about this regarding JMX as well but I don’t think I see a 
resolution for it. 
One thing I made sure was I haven’t reused name (like parseRawStats) in my App. 
Also, this seems to happen for every metric, not only for a select few where I 
could have mistakenly set the same name.
Appreciate any help on this.
Thanks, Ashish

  


Re: Flink drops messages?

2017-11-13 Thread Fabian Hueske
Thanks for the correction! :-)

2017-11-13 13:05 GMT+01:00 Kien Truong :

> Getting late elements from side-output is already available with Flink 1.3
> :)
>
> Regards,
>
> Kien
> On 11/13/2017 5:00 PM, Fabian Hueske wrote:
>
> Hi Andrea,
>
> you are right. Flink's window operators can drop messages which are too
> late, i.e., have a timestamp smaller than the last watermark.
> This is expected behavior and documented at several places [1] [2].
>
> There are a couple of options how to deal with late elements:
>
> 1. Use more conservative watermarks. This will add latency to your program
> 2. Configure an allowedLateness parameter for windows but have to be able
> to handle respective updates. [2]
> 3. Use side outputs on windows (will become available with Flink 1.4) [3]
>
> Cheers, Fabian
>
> [1] https://ci.apache.org/projects/flink/flink-docs-
> release-1.3/dev/event_time.html#late-elements
> [2] https://ci.apache.org/projects/flink/flink-docs-
> release-1.3/dev/windows.html#allowed-lateness
> [3] https://ci.apache.org/projects/flink/flink-docs-
> release-1.4/dev/stream/operators/windows.html#getting-late-data-as-a-side-
> output
>
> 2017-11-12 21:29 GMT+01:00 AndreaKinn :
>
>> Hi,
>> I'm running a Flink application where data are retrieved from a Kafka
>> broker
>> and forwarded to a Cassandra sink.
>> I've implemented the following watermark emitter:
>>
>> public class CustomTimestampExtractor implements
>> AssignerWithPeriodicWatermarks> String, Double, Double, Double>>{
>>
>> private final long maxOutOfOrderness = 800;
>> private long currentMaxTimestamp;
>>
>> @Override
>> public long extractTimestamp(Tuple8> String, String,
>> Double, Double, Double> element, long previousElementTimestamp) {
>> long timestamp = element.f2.getTime();
>> currentMaxTimestamp = Math.max(timestamp,
>> currentMaxTimestamp);
>> return timestamp;
>> }
>>
>> @Override
>> public Watermark getCurrentWatermark() {
>> return new Watermark(currentMaxTimestamp -
>> maxOutOfOrderness);
>> }
>> }
>>
>> While I have implemented a record reordering in windows on event time
>> basis:
>>
>> ...
>> .window(TumblingEventTimeWindows.of(Time.milliseconds(WINDOW_SIZE)))
>> .apply(new 
>> WindowFunction> Harness.KafkaRecord,
>> String, TimeWindow>() {
>>
>> public void apply(String key,
>> TimeWindow window,
>>
>> Iterable input,
>>
>> Collector out)
>>
>> throws Exception {
>>
>>
>> ArrayList list = new
>> ArrayList();
>>
>> for (Harness.KafkaRecord
>> in: input)
>> list.add(in);
>> Collections.sort(list);
>> for(Harness.KafkaRecord
>> output: list)
>>
>> out.collect(output);
>> }
>> });
>>
>> Unfortunately when I check Cassandra's destination table size I note that
>> some messages are lost.
>>
>> Performing 3 tests I have ingested data at 50, 25 and 15 Hz. I expected to
>> see lower loss percentage with the lower ingestion frequency, instead it
>> is
>> the opposite!!
>>
>> P.S.: Kafka ingests 45.000 messages of 1Kb each one, following the loss
>> percentage:
>>
>> 50 Hz: 0.273%
>> 25 Hz: 0.284%
>> 15 Hz: 0.302%
>>
>> My suspect is that the data are lost because they arrive with a too high
>> lateness and they are dropped by Flink. Is it a possibility?
>>
>>
>>
>>
>> --
>> Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.
>> nabble.com/
>>
>
>


Re: Streaming : a way to "key by partition id" without redispatching data

2017-11-13 Thread Nico Kruber
Hi Gwenhaël,
several functions in Flink require keyed streams because they manage their 
internal state by key. These keys, however, should be independent of the 
current execution and its parallelism so that checkpoints may be restored to 
different levels of parallelism (for re-scaling, see [1]).
Also, different operators, e.g. the source vs. the map, may have a different 
number of parallel tasks in which case you'd need to shuffle the data in order 
to adapt. The same goes for possible differences in the parallelism of the 
Kafka partitions vs. the parallelism you use in Flink.

If, however, all your operators have the same parallelism, doing multiple 
keyBy(0) calls in your program will not re-shuffle the data, because of the 
deterministic assignment of keys to operators.


Nico

[1] https://flink.apache.org/features/2017/07/04/flink-rescalable-state.html
On Thursday, 9 November 2017 18:00:13 CET Gwenhael Pasquiers wrote:
> Hello,
> 
> (Flink 1.2.1)
> 
> For performances reasons I'm trying to reduce the volume of data of my
> stream as soon as possible by windowing/folding it for 15 minutes before
> continuing to the rest of the chain that contains keyBys and windows that
> will transfer data everywhere.
> 
> Because of the huge volume of data, I want to avoid "moving" the data
> between partitions as much as possible (not like a naïve KeyBy does). I
> wanted to create a custom ProcessFunction (using timer and state to fold
> data for X minutes) in order to fold my data over itself before keying the
> stream but even ProcessFunction needs a keyed stream...
> 
> Is there a specific "key" value that would ensure me that my data won't be
> moved to another taskmanager (that it's hashcode will match the partition
> it is already in) ? I thought about the subtask id but I doubt I'd be that
> lucky :-)
> 
> Suggestions
> 
> · Wouldn't it be useful to be able to do a "partitionnedKeyBy" that
> would not move data between nodes, for windowing operations that can be
> parallelized.
> 
> o   Something like kafka => partitionnedKeyBy(0) => first folding =>
> keyBy(0) => second folding => 
> 
> · Finally, aren't all streams keyed ? Even if they're keyed by a
> totally arbitrary partition id until the user chooses its own key,
> shouldn't we be able to do a window (not windowAll) or process over any
> normal Stream's partition ?
> 
> B.R.
> 
> Gwenhaël PASQUIERS



signature.asc
Description: This is a digitally signed message part.


Re: JobManager shows TaskManager was lost/killed while TaskManger Process is still running and the network is OK.

2017-11-13 Thread Nico Kruber
>From what I read in [1], simply add JVM options to env.java.opts as you would 
when you start a Java program yourself, so setting "-XX:+UseG1GC" should 
enable G1.

Nico

[1] https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/
config.html#common-options

On Friday, 15 September 2017 19:36:02 CET AndreaKinn wrote:
> Hi, sorry for re-vive this old conversation.
> I have exactly the same problem, can you provide more details about your
> solution?
> Have you used another garbage collector as G1? How can I set it?
> 
> I've seen on configuration guideline I have to set the option: env.java.opts
> but I don't know which is the value to insert to set G1.
> 
> 
> Renkai wrote
> 
> > The zookeeper related logs are loged by user codes,I finally find the
> > reason why the taskmanger was lost,that was I gave the taskmanager a big
> > amount of memory, the jobmanager identify the taskmanager is down during
> > the taskmanager in Full GC.Thanks for your help.
> 
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/



signature.asc
Description: This is a digitally signed message part.


Re: Correlation between data streams/operators and threads

2017-11-13 Thread Shailesh Jain
Thanks, Piotr. I'll try it out and will get back in case of any further
questions.

Shailesh

On Fri, Nov 10, 2017 at 5:52 PM, Piotr Nowojski 
wrote:

> 1.  It’s a little bit more complicated then that. Each operator chain/task
> will be executed in separate thread (parallelism
>  Multiplies that). You can check in web ui how was your job split into
> tasks.
>
> 3. Yes that’s true, this is an issue. To preserve the individual
> watermarks/latencies (assuming that you have some way to calculate them
> individually per each device), you could either:
>
> a) have separate jobs per each device with parallelism 1. Pros:
> independent failures/checkpoints, Cons: resource usage (number of threads
> increases with number of devices, there are also other resources consumed
> by each job), efficiency,
> b) have one job with multiple data streams. Cons: resource usage (threads)
> c) ignore Flink’s watermarks, and implement your own code in place of it.
> You could read all of your data in single data stream, keyBy
> partition/device and manually handle watermarks logic. You could either try
> to wrap CEP/Window operators or copy/paste and modify them to suite your
> needs.
>
> I would start and try out from a). If it work for your cluster/scale then
> that’s fine. If not try b) (would share most of the code with a), and as a
> last resort try c).
>
> Kostas, would you like to add something?
>
> Piotrek
>
> On 9 Nov 2017, at 19:16, Shailesh Jain 
> wrote:
>
> On 1. - is it tied specifically to the number of source operators or to
> the number of Datastream objects created. I mean does the answer change if
> I read all the data from a single Kafka topic, get a Datastream of all
> events, and the apply N filters to create N individual streams?
>
> On 3. - the problem with partitions is that watermarks cannot be different
> per partition, and since in this use case, each stream is from a device,
> the latency could be different (but order will be correct almost always)
> and there are high chances of loosing out on events on operators like
> Patterns which work with windows. Any ideas for workarounds here?
>
>
> Thanks,
> Shailesh
>
> On 09-Nov-2017 8:48 PM, "Piotr Nowojski"  wrote:
>
> Hi,
>
> 1.
> https://ci.apache.org/projects/flink/flink-docs-release-1.3/
> dev/parallel.html
>
> Number of threads executing would be roughly speaking equal to of the
> number of input data streams multiplied by the parallelism.
>
> 2.
> Yes, you could dynamically create more data streams at the job startup.
>
> 3.
> Running 1 independent data streams on a small cluster (couple of
> nodes) will definitely be an issue, since even with parallelism set to 1,
> there would be quite a lot of unnecessary threads.
>
> It would be much better to treat your data as a single data input stream
> with multiple partitions. You could assign partitions between source
> instances based on parallelism. For example with parallelism 6:
> - source 0 could get partitions 0, 6, 12, 18
> - source 1, could get partitions 1, 7, …
> …
> - source 5, could get partitions 5, 11, ...
>
> Piotrek
>
> On 9 Nov 2017, at 10:18, Shailesh Jain 
> wrote:
>
> Hi,
>
> I'm trying to understand the runtime aspect of Flink when dealing with
> multiple data streams and multiple operators per data stream.
>
> Use case: N data streams in a single flink job (each data stream
> representing 1 device - with different time latencies), and each of these
> data streams gets split into two streams, of which one goes into a bunch of
> CEP operators, and one into a process function.
>
> Questions:
> 1. At runtime, will the engine create one thread per data stream? Or one
> thread per operator?
> 2. Is it possible to dynamically create a data stream at runtime when the
> job starts? (i.e. if N is read from a file when the job starts and
> corresponding N streams need to be created)
> 3. Are there any specific performance impacts when a large number of
> streams (N ~ 1) are created, as opposed to N partitions within a single
> stream?
>
> Are there any internal (design) documents which can help understanding the
> implementation details? Any references to the source will also be really
> helpful.
>
> Thanks in advance.
>
> Shailesh
>
>
>
>
>
>


Re: Correlation between data streams/operators and threads

2017-11-13 Thread Piotr Nowojski
Sure, let us know if you have other questions or encounter some issues.

Thanks, Piotrek

> On 13 Nov 2017, at 14:49, Shailesh Jain  wrote:
> 
> Thanks, Piotr. I'll try it out and will get back in case of any further 
> questions.
> 
> Shailesh
> 
> On Fri, Nov 10, 2017 at 5:52 PM, Piotr Nowojski  > wrote:
> 1.  It’s a little bit more complicated then that. Each operator chain/task 
> will be executed in separate thread (parallelism
>  Multiplies that). You can check in web ui how was your job split into tasks.
> 
> 3. Yes that’s true, this is an issue. To preserve the individual 
> watermarks/latencies (assuming that you have some way to calculate them 
> individually per each device), you could either:
> 
> a) have separate jobs per each device with parallelism 1. Pros: independent 
> failures/checkpoints, Cons: resource usage (number of threads increases with 
> number of devices, there are also other resources consumed by each job), 
> efficiency, 
> b) have one job with multiple data streams. Cons: resource usage (threads)
> c) ignore Flink’s watermarks, and implement your own code in place of it. You 
> could read all of your data in single data stream, keyBy partition/device and 
> manually handle watermarks logic. You could either try to wrap CEP/Window 
> operators or copy/paste and modify them to suite your needs. 
> 
> I would start and try out from a). If it work for your cluster/scale then 
> that’s fine. If not try b) (would share most of the code with a), and as a 
> last resort try c).
> 
> Kostas, would you like to add something?
> 
> Piotrek
> 
>> On 9 Nov 2017, at 19:16, Shailesh Jain > > wrote:
>> 
>> On 1. - is it tied specifically to the number of source operators or to the 
>> number of Datastream objects created. I mean does the answer change if I 
>> read all the data from a single Kafka topic, get a Datastream of all events, 
>> and the apply N filters to create N individual streams?
>> 
>> On 3. - the problem with partitions is that watermarks cannot be different 
>> per partition, and since in this use case, each stream is from a device, the 
>> latency could be different (but order will be correct almost always) and 
>> there are high chances of loosing out on events on operators like Patterns 
>> which work with windows. Any ideas for workarounds here?
>> 
>> 
>> Thanks,
>> Shailesh
>> 
>> On 09-Nov-2017 8:48 PM, "Piotr Nowojski" > > wrote:
>> Hi,
>> 
>> 1. 
>> https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/parallel.html
>>  
>> 
>> 
>> Number of threads executing would be roughly speaking equal to of the number 
>> of input data streams multiplied by the parallelism.
>> 
>> 2. 
>> Yes, you could dynamically create more data streams at the job startup.
>> 
>> 3.
>> Running 1 independent data streams on a small cluster (couple of nodes) 
>> will definitely be an issue, since even with parallelism set to 1, there 
>> would be quite a lot of unnecessary threads. 
>> 
>> It would be much better to treat your data as a single data input stream 
>> with multiple partitions. You could assign partitions between source 
>> instances based on parallelism. For example with parallelism 6:
>> - source 0 could get partitions 0, 6, 12, 18
>> - source 1, could get partitions 1, 7, …
>> …
>> - source 5, could get partitions 5, 11, ...
>> 
>> Piotrek
>> 
>>> On 9 Nov 2017, at 10:18, Shailesh Jain >> > wrote:
>>> 
>>> Hi,
>>> 
>>> I'm trying to understand the runtime aspect of Flink when dealing with 
>>> multiple data streams and multiple operators per data stream.
>>> 
>>> Use case: N data streams in a single flink job (each data stream 
>>> representing 1 device - with different time latencies), and each of these 
>>> data streams gets split into two streams, of which one goes into a bunch of 
>>> CEP operators, and one into a process function.
>>> 
>>> Questions:
>>> 1. At runtime, will the engine create one thread per data stream? Or one 
>>> thread per operator?
>>> 2. Is it possible to dynamically create a data stream at runtime when the 
>>> job starts? (i.e. if N is read from a file when the job starts and 
>>> corresponding N streams need to be created)
>>> 3. Are there any specific performance impacts when a large number of 
>>> streams (N ~ 1) are created, as opposed to N partitions within a single 
>>> stream?
>>> 
>>> Are there any internal (design) documents which can help understanding the 
>>> implementation details? Any references to the source will also be really 
>>> helpful.
>>> 
>>> Thanks in advance.
>>> 
>>> Shailesh
>>> 
>>> 
>> 
>> 
> 
> 



Re: Off heap memory issue

2017-11-13 Thread Flavio Pompermaier
Unfortunately the issue I've opened [1] was not a problem of Flink but was
just caused by an ever increasing job plan.
So no help from that..Let's hope to find out the real source of the problem.
Maybe using  -Djdk.nio.maxCachedBufferSize could help (but I didn't try it
yet)

Best,
Flavio

[1] https://issues.apache.org/jira/browse/FLINK-7845

On Wed, Oct 18, 2017 at 2:07 PM, Kien Truong 
wrote:

> Hi,
>
> We saw a similar issue in one of our job due to ByteBuffer memory leak[1].
>
> We fixed it using the solution in the article, setting -D
> jdk.nio.maxCachedBufferSize
>
> This variable is available for Java > 8u102
>
> Best regards,
>
> Kien
>
> [1]http://www.evanjones.ca/java-bytebuffer-leak.html
>
> On 10/18/2017 4:06 PM, Flavio Pompermaier wrote:
>
> We also faced the same problem, but the number of jobs we can run before
> restarting the cluster depends on the volume of the data to shuffle around
> the network. We even had problems with a single job and in order to avoid
> OOM issues we had to put some configuration to limit Netty memory usage,
> i.e.:
>  - Add to flink.yaml -> env.java.opts: -Dio.netty.recycler.maxCapacit
> y.default=1
>  - Edit taskmanager.sh and change TM_MAX_OFFHEAP_SIZE from 8388607T to 5g
>
> At this purpose we wrote a small test to reproduce the problem and we
> opened an issue for that [1].
> We still don't know if the problems are related however..
>
> I hope that could be helpful,
> Flavio
>
> [1] https://issues.apache.org/jira/browse/FLINK-7845
>
> On Wed, Oct 18, 2017 at 10:48 AM, Javier Lopez 
> wrote:
>
>> Hi Robert,
>>
>> Sorry to reply this late. We did a lot of tests, trying to identify if
>> the problem was in our custom sources/sinks. We figured out that none of
>> our custom components is causing this problem. We came up with a small
>> test, and realized that the Flink nodes run out of non-heap JVM memory and
>> crash after deployment of thousands of jobs.
>>
>> When rapidly deploying thousands or hundreds of thousands of Flink jobs -
>> depending on job complexity in terms of resource consumption - Flink nodes
>> non-heap JVM memory consumption grows until there is no more memory left on
>> the machine and the Flink process crashes. Both TaskManagers and JobManager
>> exhibit the same behavior. The TaskManagers die faster though. The memory
>> consumption doesn't decrease after stopping the deployment of new jobs,
>> with the cluster being idle (no running jobs).
>>
>> We could replicate the behavior by the rapid deployment of the WordCount
>> Job provided in the Quickstart with a Python script.  We started 24
>> instances of the deployment script to run in parallel.
>>
>> The non-heap JVM memory consumption grows faster with more complex jobs,
>> i.e. reading from Kafka 10K events and printing to STDOUT( * ). Thus less
>> deployed jobs are needed until the TaskManagers/JobManager dies.
>>
>> We employ Flink 1.3.2 in standalone mode on AWS EC2 t2.large nodes with
>> 4GB RAM inside Docker containers. For the test, we used 2 TaskManagers and
>> 1 JobManager.
>>
>> ( * ) a slightly changed Python script was used, which waited after
>> deployment 15 seconds for the 10K events to be read from Kafka, then it
>> canceled the freshly deployed job via Flink REST API.
>>
>> If you want we can provide the Scripts and Jobs we used for this test. We
>> have a workaround for this, which restarts the Flink nodes once a memory
>> threshold is reached. But this has lowered the availability of our services.
>>
>> Thanks for your help.
>>
>> On 30 August 2017 at 10:39, Robert Metzger  wrote:
>>
>>> I just saw that your other email is about the same issue.
>>>
>>> Since you've done a heapdump already, did you see any pattern in the
>>> allocated objects? Ideally none of the classes from your user code should
>>> stick around when no job is running.
>>> What's the size of the heap dump? I'm happy to take a look at it if it's
>>> reasonably small.
>>>
>>> On Wed, Aug 30, 2017 at 10:27 AM, Robert Metzger 
>>> wrote:
>>>
 Hi Javier,

 I'm not aware of such issues with Flink, but if you could give us some
 more details on your setup, I might get some more ideas on what to look 
 for.

 are you using the RocksDBStateBackend? (RocksDB is doing some JNI
 allocations, that could potentially leak memory)
 Also, are you passing any special garbage collector options? (Maybe
 some classes are not unloaded)
 Are you using anything else that is special (such as protobuf or avro
 formats, or any other big library)?

 Regards,
 Robert



 On Mon, Aug 28, 2017 at 5:04 PM, Javier Lopez 
 wrote:

> Hi all,
>
> we are starting a lot of Flink jobs (streaming), and after we have
> started 200 or more jobs we see that the non-heap memory in the
> taskmanagers increases a lot, to the point of killing the instances. We
> found out that every time we start a new job, the committed non-heap 
> memory

Apache Flink - Question about TriggerResult.FIRE

2017-11-13 Thread M Singh
Hi Flink Users
I have a few questions about triggers:
If a trigger returns TriggerResult.FIRE from say the onProcessingTime method - 
the window computation is triggered but elements are kept in the window.  If 
there a second invocation of the onProcessingTime method will the elements from 
the previous window (which were not purged) a part of the new window 
computation along with new events added since the last FIRE event ? 
Secondly, how does the FIRE option affect the sliding window computation ?
If there are any other insights/pitfalls while dealing with this, please let me 
know.
Thanks
Mans



Re: Scale by the Bay Flink talks

2017-11-13 Thread Stephan Ewen
Hi Ken!

I'm happy to chat with you after my talk on Saturday at 9am.
http://sched.co/BLwI. I will be at Scale by the Bay until early afternoon.

I am also giving talk and workshop at QCon and am part of a panel there.

I am not joining a Meetup this time, because with the conferences and some
more talks at companies, my schedule was pretty packed already.

Best,
Stephan



On Nov 12, 2017 19:19, "Ken Krugler"  wrote:

Hi all,

I’m going to be at Scale by the Bay later this week, and am looking forward
to hearing about Flink during the formal talks.

But I was wondering if anyone knows whether there will also be informal
talks/meetups on Flink during the event.

Thanks,

— Ken

--
Ken Krugler
http://www.scaleunlimited.com
custom big data solutions & training
Hadoop, Cascading, Cassandra & Solr


Re: Correlation between data streams/operators and threads

2017-11-13 Thread Shailesh Jain
Hi Piotrek,

I tried out option 'a' mentioned above, but instead of separate jobs, I'm
creating separate streams per device. Following is the test deployment
configuration as a local cluster (8GB ram, 2.5 GHz i5, ubuntu machine):

akka.client.timeout 15 min
jobmanager.heap.mb 1024
jobmanager.rpc.address localhost
jobmanager.rpc.port 6123
jobmanager.web.port 8081
metrics.reporter.jmx.class org.apache.flink.metrics.jmx.JMXReporter
metrics.reporter.jmx.port 8789
metrics.reporters jmx
parallelism.default 1
taskmanager.heap.mb 1024
taskmanager.memory.preallocate false
taskmanager.numberOfTaskSlots 4

The number of Operators per device stream is 4 (one sink function, 3 CEP
operators).

Observations (and questions):

1. No. of threads (captured through JMX) is almost the same as the total
number of operators being created. This clears my original question in this
thread.

2. Even when the number of task slots is 4, on web ui, it shows 3 slots as
free. Is this expected? Why are the subtasks not being distributed across
slots?

3. Job deployment hangs (never switches to RUNNING) when the number of
devices is greater than 5. Even on increasing the akka client timeout, it
does not help. Will separate jobs being deployed per device instead of
separate streams help here?

4. Is there an upper limit on number task slots which can be configured? I
know that my operator state size at any given point in time would not be
very high, so it looks OK to deploy independent jobs which can be deployed
on the same task manager across slots.

Thanks,
Shailesh


On Mon, Nov 13, 2017 at 7:21 PM, Piotr Nowojski 
wrote:

> Sure, let us know if you have other questions or encounter some issues.
>
> Thanks, Piotrek
>
>
> On 13 Nov 2017, at 14:49, Shailesh Jain 
> wrote:
>
> Thanks, Piotr. I'll try it out and will get back in case of any further
> questions.
>
> Shailesh
>
> On Fri, Nov 10, 2017 at 5:52 PM, Piotr Nowojski 
> wrote:
>
>> 1.  It’s a little bit more complicated then that. Each operator
>> chain/task will be executed in separate thread (parallelism
>>  Multiplies that). You can check in web ui how was your job split into
>> tasks.
>>
>> 3. Yes that’s true, this is an issue. To preserve the individual
>> watermarks/latencies (assuming that you have some way to calculate them
>> individually per each device), you could either:
>>
>> a) have separate jobs per each device with parallelism 1. Pros:
>> independent failures/checkpoints, Cons: resource usage (number of threads
>> increases with number of devices, there are also other resources consumed
>> by each job), efficiency,
>> b) have one job with multiple data streams. Cons: resource usage (threads)
>> c) ignore Flink’s watermarks, and implement your own code in place of it.
>> You could read all of your data in single data stream, keyBy
>> partition/device and manually handle watermarks logic. You could either try
>> to wrap CEP/Window operators or copy/paste and modify them to suite your
>> needs.
>>
>> I would start and try out from a). If it work for your cluster/scale then
>> that’s fine. If not try b) (would share most of the code with a), and as a
>> last resort try c).
>>
>> Kostas, would you like to add something?
>>
>> Piotrek
>>
>> On 9 Nov 2017, at 19:16, Shailesh Jain 
>> wrote:
>>
>> On 1. - is it tied specifically to the number of source operators or to
>> the number of Datastream objects created. I mean does the answer change if
>> I read all the data from a single Kafka topic, get a Datastream of all
>> events, and the apply N filters to create N individual streams?
>>
>> On 3. - the problem with partitions is that watermarks cannot be
>> different per partition, and since in this use case, each stream is from a
>> device, the latency could be different (but order will be correct almost
>> always) and there are high chances of loosing out on events on operators
>> like Patterns which work with windows. Any ideas for workarounds here?
>>
>>
>> Thanks,
>> Shailesh
>>
>> On 09-Nov-2017 8:48 PM, "Piotr Nowojski"  wrote:
>>
>> Hi,
>>
>> 1.
>> https://ci.apache.org/projects/flink/flink-docs-release-1.3/
>> dev/parallel.html
>>
>> Number of threads executing would be roughly speaking equal to of the
>> number of input data streams multiplied by the parallelism.
>>
>> 2.
>> Yes, you could dynamically create more data streams at the job startup.
>>
>> 3.
>> Running 1 independent data streams on a small cluster (couple of
>> nodes) will definitely be an issue, since even with parallelism set to 1,
>> there would be quite a lot of unnecessary threads.
>>
>> It would be much better to treat your data as a single data input stream
>> with multiple partitions. You could assign partitions between source
>> instances based on parallelism. For example with parallelism 6:
>> - source 0 could get partitions 0, 6, 12, 18
>> - source 1, could get partitions 1, 7, …
>> …
>> - source 5, could get partitions 5, 11, ...
>>
>> Piotrek
>>
>> On 9 Nov 2017, at 10:18, Shailesh Jain