Re: [Discuss] Semantics of event time for state TTL

2019-04-05 Thread Konstantin Knauf
Hi Andrey,

I agree with Elias. This would be the most natural behavior. I wouldn't add
additional slightly different notions of time to Flink.

As I can also see a use case for the combination

* Timestamp stored: Event timestamp
* Timestamp to check expiration: Processing Time

we could (maybe in a second step) add the possibility to mix and match time
characteristics for both aspects.

Cheers,

Konstantin

On Thu, Apr 4, 2019 at 7:59 PM Elias Levy 
wrote:

> My 2c:
>
> Timestamp stored with the state value: Event timestamp
> Timestamp used to check expiration: Last emitted watermark
>
> That follows the event time processing model used elsewhere is Flink.
> E.g. events are segregated into windows based on their event time, but the
> windows do not fire until the watermark advances past the end of the window.
>
>
> On Thu, Apr 4, 2019 at 7:55 AM Andrey Zagrebin 
> wrote:
>
>> Hi All,
>>
>> As you might have already seen there is an effort tracked in FLINK-12005
>> [1] to support event time scale for state with time-to-live (TTL) [2].
>> While thinking about design, we realised that there can be multiple
>> options
>> for semantics of this feature, depending on use case. There is also
>> sometimes confusion because of event time out-of-order nature in Flink. I
>> am starting this thread to discuss potential use cases of this feature and
>> their requirements for interested users and developers. There was already
>> discussion thread asking about event time for TTL and it already contains
>> some thoughts [3].
>>
>> There are two semantical cases where we use time for TTL feature at the
>> moment. Firstly, we store timestamp of state last access/update. Secondly,
>> we use this timestamp and current timestamp to check expiration and
>> garbage
>> collect state at some point later.
>>
>> At the moment, Flink supports *only processing time* for both timestamps:
>> state *last access and current timestamp*. It is basically current local
>> system unix epoch time.
>>
>> When it comes to event time scale, we also need to define what Flink
>> should
>> use for these two timestamps. Here I will list some options and their
>> possible pros for discussion. There might be more depending on use
>> case.
>>
>> *Last access timestamp (stored in backend with the actual state value):*
>>
>>- *Event timestamp of currently being processed record.* This seems to
>>be the simplest option and it allows user-defined timestamps in state
>>backend. The problem here might be instability of event time which can
>> not
>>only increase but also decrease if records come out of order. This can
>> lead
>>to rewriting the state timestamp to smaller value which is unnatural
>> for
>>the notion of time.
>>- *Max event timestamp of records seen so far for this record key.*
>> This
>>option is similar to the previous one but it tries to fix the notion of
>>time to make it always increasing. Maintaining this timestamp has also
>>performance implications because the previous timestamp needs to be
>> read
>>out to decide whether to rewrite it.
>>- *Last emitted watermark*. This is what we usually use for other
>>operations to trigger some actions in Flink, like timers and windows
>> but it
>>can be unrelated to the record which actually triggers the state
>> update.
>>
>> *Current timestamp to check expiration:*
>>
>>- *Event timestamp of last processed record.* Again quite simple but
>>unpredictable option for out-of-order events. It can potentially lead
>> to
>>undesirable expiration of late buffered data in state without control.
>>- *Max event timestamp of records seen so far for operator backend.*
>> Again
>>similar to previous one, more stable but still user does not have too
>> much
>>control when to expire state.
>>- *Last emitted watermark*. Again, this is what we usually use for
>> other
>>operations to trigger some actions in Flink, like timers and windows.
>> It
>>also gives user some control to decide when state is expired (up to
>> which
>>point in event time) by emitting certain watermark. It is more
>> flexible but
>>complicated. If some watermark emitting strategy is already used for
>> other
>>operations, it might be not optimal for TTL and delay state cleanup.
>>- *Current processing time.* This option is quite simple, It would mean
>>that user just decides which timestamp to store but it will expire in
>> real
>>time. For data privacy use case, it might be better because we want
>> state
>>to be unavailable in particular real moment of time since the
>> associated
>>piece of data was created in event time. For long term approximate
>> garbage
>>collection, it might be not a problem as well. For quick expiration,
>> the
>>time skew between event and processing time can lead again to premature
>>deletion of late data and user cannot delay it.
>>
>> We could also make this behaviour 

HA and zookeeper

2019-04-05 Thread Boris Lublinsky
For HA implementation, is zookeeper is used only for leader selection, or it 
also stores some data relevant for switching to backup server
Boris Lublinsky
FDP Architect
boris.lublin...@lightbend.com
https://www.lightbend.com/



Re: InvalidProgramException when trying to sort a group within a dataset

2019-04-05 Thread Fabian Hueske
Hi,

You POJO should implement the Serializable interface.
Otherwise it's not considered to be serializable.

Best, Fabian

Papadopoulos, Konstantinos 
schrieb am Mi., 3. Apr. 2019, 07:22:

> Hi Chesnay,
>
>
>
> Thanks for your support. ThresholdAcvFact class is a simple POJO with the
> following definition:
>
>
>
> public class ThresholdAcvFact {
>
>
>
> private Long timePeriodId;
>
> private Long geographyId;
>
> private Long productId;
>
> private Long customerId;
>
> private Double basePrice;
>
> private Double promoPrice;
>
> private Double basePriceAcv;
>
> private Double promoPriceAcv;
>
> private Long count;
>
>
>
> public Long getTimePeriodId() {
>
> return timePeriodId;
>
> }
>
>
>
> public void setTimePeriodId(Long timePeriodId) {
>
> this.timePeriodId = timePeriodId;
>
> }
>
>
>
> public Long getGeographyId() {
>
> return geographyId;
>
> }
>
>
>
> public void setGeographyId(Long geographyId) {
>
> this.geographyId = geographyId;
>
> }
>
>
>
> public Long getProductId() {
>
> return productId;
>
> }
>
>
>
> public void setProductId(Long productId) {
>
> this.productId = productId;
>
> }
>
>
>
> public Long getCustomerId() {
>
> return customerId;
>
> }
>
>
>
> public void setCustomerId(Long customerId) {
>
> this.customerId = customerId;
>
> }
>
>
>
> public Double getBasePrice() {
>
> return basePrice;
>
> }
>
>
>
> public void setBasePrice(Double basePrice) {
>
> this.basePrice = basePrice;
>
> }
>
>
>
> public Double getPromoPrice() {
>
> return promoPrice;
>
> }
>
>
>
> public void setPromoPrice(Double promoPrice) {
>
> this.promoPrice = promoPrice;
>
> }
>
>
>
> public Double getBasePriceAcv() {
>
> return basePriceAcv;
>
> }
>
>
>
> public void setBasePriceAcv(Double basePriceAcv) {
>
> this.basePriceAcv = basePriceAcv;
>
> }
>
>
>
> public Double getPromoPriceAcv() {
>
> return promoPriceAcv;
>
> }
>
>
>
> public void setPromoPriceAcv(Double promoPriceAcv) {
>
> this.promoPriceAcv = promoPriceAcv;
>
> }
>
>
>
> public Long getCount() {
>
> return count;
>
> }
>
>
>
> public void setCount(Long count) {
>
> this.count = count;
>
> }
>
>
>
> @Override
>
> public String toString() {
>
> return "ThresholdAcvFact{" +
>
> "timePeriodId=" + timePeriodId +
>
> ", geographyId=" + geographyId +
>
> ", productId=" + productId +
>
> ", customerId=" + customerId +
>
> ", basePrice=" + basePrice +
>
> ", promoPrice=" + promoPrice +
>
> ", basePriceAcv=" + basePriceAcv +
>
> ", promoPriceAcv=" + promoPriceAcv +
>
> ", count=" + count +
>
> '}';
>
> }
>
>
>
> }
>
>
>
> While the implementation of the function we faced the issue reported is
> the following:
>
>
>
> public DataSet transform(ThresholdAcvCalcSources
> thresholdAcvCalcSources, Long customerId) {
>
>
>
> final DataSet basePriceFacts = getPriceFacts(
>
> thresholdAcvCalcSources.getBasePriceDataSet(),
> thresholdAcvCalcSources.getIriMapStoreGeographyDataSet(),
>
> new ThresholdAcvBasePriceFactMapper(customerId));
>
>
>
> final DataSet promoPriceFacts = getPriceFacts(
>
> thresholdAcvCalcSources.getPromoPriceDataSet(),
> thresholdAcvCalcSources.getIriMapStoreGeographyDataSet(),
>
> new ThresholdAcvPromoPriceFactMapper(customerId));
>
>
>
> return basePriceFacts
>
> .fullOuterJoin(promoPriceFacts)
>
> .where(PRODUCT_ID_FIELD, TIME_PERIOD_ID_FIELD,
> GEOGRAPHY_ID, "basePrice")
>
> .equalTo(PRODUCT_ID_FIELD, TIME_PERIOD_ID_FIELD,
> GEOGRAPHY_ID, "promoPrice")
>
> .with(new ThresholdAcvFactBasePromoPriceJoiner())
>
> .groupBy(PRODUCT_ID_FIELD, TIME_PERIOD_ID_FIELD,
> GEOGRAPHY_ID)
>
> .sortGroup(new KeySelector() {
>
> @Override
> public Double getKey(ThresholdAcvFact thresholdAcvFact) throws
> Exception {
>
>   return
> Optional.ofNullable(thresholdAcvFact.getBasePrice()).orElse(thresholdAcvFact.getPromoPrice());
>
>
>  }
>   }, Order.*ASCENDING*)
>
> .reduceGroup(new ThresholdAcvFactCountGroupReducer());
>
>
>
> }
>
>
>
> Regards,
>
> Konstantinos
>
>
>
> *From:* Chesnay Schepler 
> *Sent:* Τετάρτη, 3 Απριλίου 2019 12:59 μμ
> *To:* Papadopoulos, Konstantinos
> ; user@flink.apache.org
> *Subject:* Re: InvalidProgramException when trying to sort a group within
> a dataset
>
>
>
> Your user-defined functions are referencing the class
> "com.iri.aa.etl.rgm.service.ThresholdAcvCalcServiceImpl" which isn't
> serializable.
>
> My guess is that 

Re: Source reinterpretAsKeyedStream

2019-04-05 Thread Fabian Hueske
Hi,

Konstantin is right.
reinterpreteAsKeyedStream only works if you call it on a DataStream that
was keyBy'ed before (with the same parallelism).
Flink cannot reuse the partioning of another system like Kafka.

Best, Fabian


Adrienne Kole  schrieb am Do., 4. Apr. 2019, 14:33:

> Thanks a lot for the replies.
>
> Below I paste my code:
>
>
> DataStreamSource source = env.addSource(new MySource());
> KeyedStream keyedStream =
> DataStreamUtils.reinterpretAsKeyedStream(source, new DummyKeySelector(),
> TypeInformation.of(Integer.class) );
> keyedStream.timeWindow(Time.seconds(1)).apply(new
> WindowFunction() {
> @Override
> public void apply(Integer integer, TimeWindow timeWindow,
> Iterable iterable, Collector collector) throws Exception {
> collector.collect(1);
> }
> });
> env.execute("Test");
>
> static class DummyKeySelector implements KeySelector {
>
> @Override
> public Integer getKey(Tuple value) throws Exception {
> return value.getSourceID();
> }
> }
>
> static class MySource extends RichParallelSourceFunction {
> public MySource() {
> this.sourceID = sourceID;
> }
> @Override
> public void open(Configuration parameters) throws Exception {
> sourceID = sourceID +
> getRuntimeContext().getIndexOfThisSubtask();
> }
>
> @Override
> public void run(SourceContext ctx) throws Exception {
> while (true) {
> Tuple tuple = new Tuple(sourceID);
> ctx.collect(tuple);
> }
> }
>
> @Override
> public void cancel() {
>
> }
> }
>
>
> Whatever I do, I get
> Caused by: java.lang.IllegalArgumentException
> at
> org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:123)
>
> When I check the details from the source code, it seems that some keys are
> not within allowed key range, that is why Flink throws an exception.
> In this case, as Konstantin said, it is not possible to interpret source
> as keyed.
> Please correct me if I am wrong.
>
>
> Thanks,
> Adrienne
>
>
>
>
>
>
>
> On Wed, Apr 3, 2019 at 8:08 PM Konstantin Knauf 
> wrote:
>
>> Hi Adrienne,
>>
>> you can only use DataStream#reinterpretAsKeyedStream on a stream, which
>> has previously been keyed/partitioned by Flink with exactly the same
>> KeySelector as given to reinterpretAsKeyedStream. It does not work with a
>> key-partitioned stream, which has been partitioned by any other process.
>>
>> Best,
>>
>> Konstantin
>>
>> On Fri, Mar 29, 2019 at 11:47 PM Rong Rong  wrote:
>>
>>> Hi Adrienne,
>>>
>>> I think you should be able to reinterpretAsKeyedStream by passing in a
>>> DataStreamSource based on the ITCase example [1].
>>> Can you share the full code/error logs or the IAE?
>>>
>>> --
>>> Rong
>>>
>>> [1]
>>> https://github.com/apache/flink/blob/release-1.7.2/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/datastream/ReinterpretDataStreamAsKeyedStreamITCase.java#L98
>>>
>>> On Fri, Mar 29, 2019 at 6:09 AM Adrienne Kole 
>>> wrote:
>>>
 Dear community,

 I have a use-case where sources are keyed.
 For example, there is a source function with parallelism 10, and each
 instance has its own key.
 I used reinterpretAsKeyedStream to convert source DataStream to
 KeyedStream, however, I get an IllegalArgument exception.
 Is reinterpretAsKeyedStream can be used with source operators as well,
 or should the operator to be used be already partitioned (by keyby(..)) ?

 Thanks,
 Adrienne

>>>
>>
>> --
>>
>> Konstantin Knauf | Solutions Architect
>>
>> +49 160 91394525
>>
>> 
>>
>> Follow us @VervericaData
>>
>> --
>>
>> Join Flink Forward  - The Apache Flink
>> Conference
>>
>> Stream Processing | Event Driven | Real Time
>>
>> --
>>
>> Data Artisans GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>>
>> --
>> Data Artisans GmbH
>> Registered at Amtsgericht Charlottenburg: HRB 158244 B
>> Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen
>>
>


Re: [DISCUSS] Drop Elasticssearch 1 connector

2019-04-05 Thread Stephan Ewen
+1 to drop it

Previously released versions are still available and compatible with newer
Flink versions anyways.

On Fri, Apr 5, 2019 at 2:12 PM Bowen Li  wrote:

> +1 for dropping elasticsearch 1 connector.
>
> On Wed, Apr 3, 2019 at 5:10 AM Chesnay Schepler 
> wrote:
>
>> Hello everyone,
>>
>> I'm proposing to remove the connector for elasticsearch 1.
>>
>> The connector is used significantly less than more recent versions (2&5
>> are downloaded 4-5x more), and hasn't seen any development for over a
>> hear, yet still incurred maintenance overhead due to licensing and
>> testing.
>>
>>
>>


Re: Cannot download Jars from S3 due to resource timestamp changed

2019-04-05 Thread Yan Yan
Hi Yantao,

Thanks, I have also commented in the original JIRA.
https://issues.apache.org/jira/browse/FLINK-8801?focusedCommentId=16807691=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel

@Nico @Till Do you mind review if an alternative fix would be needed? If
so, I can create a new JIRA.

Thanks,
Yan

On Thu, Apr 4, 2019 at 5:45 PM yangtao.yt 
wrote:

> Hi, Yan.
> we have met this problem too when using aliyun-pangu and have commented
> in FLINK-8801 but no response yet.
> I think most file systems including s3/s3a/s3n/azure/aliyun-oss etc can
> encounter this problem since they doesn’t implement FileSystem#setTimes but
> the PR in FLINK-8801 think they does.
> We have made a similar workaround for this problem.
>
> Comment link:
> https://issues.apache.org/jira/browse/FLINK-8801?focusedCommentId=16807691=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-16807691
>
> Best,
> Tao Yang
>
> 在 2019年4月5日,上午5:22,Yan Yan  写道:
>
> Hi,
>
> I am running issues when trying to move from HDFS to S3 using Flink 1.6.
>
> I am getting an exception from Hadoop code:
>
> IOException("Resource " + sCopy +
> " changed on src filesystem (expected " + resource.getTimestamp() +
> ", was " + sStat.getModificationTime());
>
>
> Digging into this, I found there was one commit
> 
>  made
> by Nico trying to fix this issue in 2018. However, the fix did not work for
> my case, as the fs.setTimes() method was not implemented in the hadoop-aws
> S3AFilesystem I am using. And it seems S3 does not allow you to override
> the last modified time for an object.
>
> I am able to make an workaround the other way round: reading the timestamp
> from S3 and override the local resource. Just wonder if any one has seen
> similar issues, or he/she can actually make it work by using different
> implementation of S3AFilesystem? Thanks!
>
> --
> Best,
> Yan
>
>
>

-- 
Best,
Yan


Re: [DISCUSS] Drop Elasticssearch 1 connector

2019-04-05 Thread Bowen Li
+1 for dropping elasticsearch 1 connector.

On Wed, Apr 3, 2019 at 5:10 AM Chesnay Schepler  wrote:

> Hello everyone,
>
> I'm proposing to remove the connector for elasticsearch 1.
>
> The connector is used significantly less than more recent versions (2&5
> are downloaded 4-5x more), and hasn't seen any development for over a
> hear, yet still incurred maintenance overhead due to licensing and testing.
>
>
>


Flink 1.7.1 JobManager (docker) exits with status 0 - completed

2019-04-05 Thread anaray
Hi,

We are using flink 1.7.1 and running as docker container. State backend is
Ceph. Problem is that JobManager on startup exits with docker exit 0 (ie
Completed). The only error/exception that I see is given below. Please share
your thoughts.

2019-04-05 12:14:04,314 INFO 
org.apache.flink.fs.shaded.hadoop3.org.apache.commons.beanutils.FluentPropertyBeanIntrospector
 
- Error when creating PropertyDescriptor for public final void
org.apache.flink.fs.shaded.hadoop3.org.apache.commons.configuration2.AbstractConfiguration.setProperty(java.lang.String,java.lang.Object)!
Ignoring this property.
2019-04-05 12:14:40,787 ERROR
org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.model.transform.XmlResponsesSaxParser
 
- S3 response indicates truncated results, but contains no object summaries.
2019-04-05 12:20:08,186 ERROR
org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.model.transform.XmlResponsesSaxParser
 
- S3 response indicates truncated results, but contains no object summaries.
2019-04-05 12:20:11,285 ERROR
org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.model.transform.XmlResponsesSaxParser
 
- S3 response indicates truncated results, but contains no object summaries.

Thanks,




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: print() method does not always print on the taskmanager.out file

2019-04-05 Thread Felipe Gutierrez
I guess there is something to do with the parallelism of the cluster. When
I set "taskmanager.numberOfTaskSlots" to 1 and do not use
"setParallelism()" I can see the logs. And on Eclipse I can see the logs.

Does anybody have a clue?
Thanks
*--*
*-- Felipe Gutierrez*

*-- skype: felipe.o.gutierrez*
*--* *https://felipeogutierrez.blogspot.com
*


On Fri, Apr 5, 2019 at 5:10 PM Felipe Gutierrez <
felipe.o.gutier...@gmail.com> wrote:

> no. It did not work.
>
> I also created a Sink that is a MQTT publisher (
> https://github.com/felipegutierrez/explore-flink/blob/master/src/main/java/org/sense/flink/mqtt/MqttSensorPublisher.java)
> and on my eclipse it works. When I deploy my job on my Flink cluster it
> does not work. It might be something wrong with my cluster configuration.
>
> Something that I did was comment the line "# 127.0.1.1 ubuntu16-worker01"
> on the "/etc/hosts" file in order to the JobManager find the TaskManager. I
> commented on this line also on the master node. The master is my machine
> and the worker is a virtual machine.
>
>
> *--*
> *-- Felipe Gutierrez*
>
> *-- skype: felipe.o.gutierrez*
> *--* *https://felipeogutierrez.blogspot.com
> *
>
>
> On Fri, Apr 5, 2019 at 2:50 PM Chesnay Schepler 
> wrote:
>
>> This kind of sounds like a Outputstream flushing issue. Try calling
>> "System.out.flush()" now and then in your sink and report back.
>>
>> On 04/04/2019 18:04, Felipe Gutierrez wrote:
>>
>> Hello,
>>
>> I am studying the parallelism of tasks on DataStream. So, I have
>> configured Flink to execute on my machine (master node) and one virtual
>> machine (worker node).  The master has 4 cores
>> (taskmanager.numberOfTaskSlots: 4) and the worker only 2 cores
>> (taskmanager.numberOfTaskSlots: 2). I don't need to set this on the
>> 'conf/flink-conf.yaml', this was just to ensure that I am relating the
>> properties with the right concepts.
>>
>> When I create a application with parallelism of 1, 2, or 4, sometimes I
>> can see the output of the "print()" method, other times no. I checke the
>> output files of the task managers ("flink-flink-taskexecutor-0-master.out"
>> or "flink-flink-taskexecutor-0-worker.out") and I cancel the job and start
>> it again. All of sudden I can see the output on the .out file.
>> I was thinking that it was because I am creating a job with more
>> parallelism that the cluster supports, but this behavior also happens when
>> I set the parallelism of my job to less than the slots available.
>>
>> I guess if I see on the Flink dashboar X Task slots available and when I
>> deploy my Job, the Job is running and the slots available decreased
>> according to the number of parallelims of my Job, everything should be
>> correct, doesn't it? I also created a Dummy Sink just to print the output,
>> but the behavior is the same.
>>
>> Here is my code:
>> https://github.com/felipegutierrez/explore-flink/blob/master/src/main/java/org/sense/flink/examples/stream/MqttSensorRandomPartitionByKeyDAG.java#L48
>>
>> Thanks,
>> Felipe
>> *--*
>> *-- Felipe Gutierrez*
>>
>> *-- skype: felipe.o.gutierrez *
>> *--* *https://felipeogutierrez.blogspot.com
>> *
>>
>>
>>


Enabling JMX Reporter on a Local Mini Cluster

2019-04-05 Thread Frank Wilson
What's the best way to enable the JMX Reporter while I am developing an
applicaiton in an IDE? The reason is I would like to experiment with adding
detailed metrics to my pipelines (and also see what standard operators
provide) without having to deploy to a regular cluster.

Thanks,

Frank


Re: print() method does not always print on the taskmanager.out file

2019-04-05 Thread Felipe Gutierrez
no. It did not work.

I also created a Sink that is a MQTT publisher (
https://github.com/felipegutierrez/explore-flink/blob/master/src/main/java/org/sense/flink/mqtt/MqttSensorPublisher.java)
and on my eclipse it works. When I deploy my job on my Flink cluster it
does not work. It might be something wrong with my cluster configuration.

Something that I did was comment the line "# 127.0.1.1 ubuntu16-worker01"
on the "/etc/hosts" file in order to the JobManager find the TaskManager. I
commented on this line also on the master node. The master is my machine
and the worker is a virtual machine.


*--*
*-- Felipe Gutierrez*

*-- skype: felipe.o.gutierrez*
*--* *https://felipeogutierrez.blogspot.com
*


On Fri, Apr 5, 2019 at 2:50 PM Chesnay Schepler  wrote:

> This kind of sounds like a Outputstream flushing issue. Try calling
> "System.out.flush()" now and then in your sink and report back.
>
> On 04/04/2019 18:04, Felipe Gutierrez wrote:
>
> Hello,
>
> I am studying the parallelism of tasks on DataStream. So, I have
> configured Flink to execute on my machine (master node) and one virtual
> machine (worker node).  The master has 4 cores
> (taskmanager.numberOfTaskSlots: 4) and the worker only 2 cores
> (taskmanager.numberOfTaskSlots: 2). I don't need to set this on the
> 'conf/flink-conf.yaml', this was just to ensure that I am relating the
> properties with the right concepts.
>
> When I create a application with parallelism of 1, 2, or 4, sometimes I
> can see the output of the "print()" method, other times no. I checke the
> output files of the task managers ("flink-flink-taskexecutor-0-master.out"
> or "flink-flink-taskexecutor-0-worker.out") and I cancel the job and start
> it again. All of sudden I can see the output on the .out file.
> I was thinking that it was because I am creating a job with more
> parallelism that the cluster supports, but this behavior also happens when
> I set the parallelism of my job to less than the slots available.
>
> I guess if I see on the Flink dashboar X Task slots available and when I
> deploy my Job, the Job is running and the slots available decreased
> according to the number of parallelims of my Job, everything should be
> correct, doesn't it? I also created a Dummy Sink just to print the output,
> but the behavior is the same.
>
> Here is my code:
> https://github.com/felipegutierrez/explore-flink/blob/master/src/main/java/org/sense/flink/examples/stream/MqttSensorRandomPartitionByKeyDAG.java#L48
>
> Thanks,
> Felipe
> *--*
> *-- Felipe Gutierrez*
>
> *-- skype: felipe.o.gutierrez *
> *--* *https://felipeogutierrez.blogspot.com
> *
>
>
>


Re: Flink on Mesos

2019-04-05 Thread Till Rohrmann
Hi Juan,

thanks for reporting this issue. If you could open an issue and also
provide a fix for it, then this would be awesome. Please let me know the
ticket number so that I can monitor it and give your PR a review.

Cheers,
Till

On Fri, Apr 5, 2019 at 5:34 AM Juan Gentile  wrote:

> Hello!
>
>
>
> We are having a small problem while trying to deploy Flink on Mesos using
> marathon. In our set up of Mesos we are required to specify the amount of
> disk space we want to have for the applications we deploy there.
>
> The current default value in Flink is 0 and it’s currently is not
> parameterizable. This means that we ask 0 disk space for our instances so
> Flink can’t work.
>
> I’d appreciate suggestions if you have any. Otherwise and since this is
> causing some problems on our side, I’d like to know if I can create a
> ticket on Flink and work on it; looks like the fix should be quite easy to
> implement.
>
>
>
> Thank you,
>
> Juan.
>


Re: How to submit Flink program to Yarn without upload the fat jar?

2019-04-05 Thread Chesnay Schepler
Which Flink version are you using? The DISABLED value has not been 
working since 1.5, so you may be stuck with uploading the app jar every 
time.


On 04/04/2019 11:35, 徐涛 wrote:

Hi Experts,
When submitting a Flink program to Yarn, the app jar( a fat jar about 200M 
with Flink dependencies ) will be uploaded to Yarn, which will take a lot of time. I 
check the code in CliFrontend, and found that there is a config item named 
“yarn.per-job-cluster.include-user-jar”, I try to set the config item value to 
“DISABLED”, the fat jar will not be uploaded, but I have to set the CLASSPATH 
environment of the Yarn container, to point to a route on HDFS. Also I found a 
config item starts with “containerized.master.env.”, I set the 
“containerized.master.env.CLASSPATH” value to 
"hdfs://xm-hdfs-test-01/henry/flink-application-0.0.1-SNAPSHOT-uber.jar”, but 
with no luck. I also found that the ConfigConstants.CONTAINERIZED_MASTER_ENV_PREFIX 
variable is marked as deprecated, so maybe I can not use it.
So how could this requirement be implemented? Because I think it will 
save a lot of time to start the Flink program.

Best
Henry





Re: Flink - Type Erasure Exception trying to use Tuple6 instead of Tuple

2019-04-05 Thread Chesnay Schepler

> I tried using  [ keyBy(KeySelector, TypeInformation) ]

What was the result of this approach?

On 03/04/2019 17:36, Vijay Balakrishnan wrote:

Hi Tim,
Thanks for your reply. I am not seeing an option to specify a 
.returns(new TypeHintString,String,String,String,String>>(){}) with KeyedStream ??


monitoringTupleKeyedStream = kinesisStream.keyBy(new
KeySelector() {
public Tuple getKey(Monitoring mon) throws Exception
{..return new Tuple6<>(..}   })

I tried using
TypeInformationString>> info = TypeInformation.of(new TypeHintString, String, String, String>>(){});


kinesisStream.keyBy(new KeySelector() {...},
info); //specify typeInfo through


TIA,
Vijay

On Tue, Apr 2, 2019 at 6:06 PM Timothy Victor > wrote:


Flink needs type information for serializing and deserializing
objects, and that is lost due to Java type erasure.   The only way
to workaround this is to specify the return type of the function
called in the lambda.

Fabian's answer here explains it well.


https://stackoverflow.com/questions/50945509/apache-flink-return-type-of-function-could-not-be-determined-automatically-due/50947554

Tim

On Tue, Apr 2, 2019, 7:03 PM Vijay Balakrishnan
mailto:bvija...@gmail.com>> wrote:

Hi,
I am trying to use the KeyedStream with Tuple to handle
diffrent types of Tuples including Tuple6.
Keep getting the Exception:
*Exception in thread "main"
org.apache.flink.api.common.functions.InvalidTypesException:
Usage of class Tuple as a type is not allowed. Use a concrete
subclass (e.g. Tuple1, Tuple2, etc.) instead*.
Is there a way around Type Erasure here ?
I want to use KeyedStream so that I can
pass it on to treat Tuple6 as a Tuple like the
monitoringTupleKeyedStream.

Code below:

KeyedStream monitoringTupleKeyedStream
= null;
String keyOperationType = ;//provided
if (StringUtils.isNotEmpty(keyOperationType)) {
if
(keyOperationType.equalsIgnoreCase(Utils.COMPONENT_OPERATION))
{
monitoringTupleKeyedStream =
kinesisStream.keyBy("deployment", "gameId", "eventName",
"component");
} else if

(keyOperationType.equalsIgnoreCase(Utils.COMPONENT_INSTANCE_OPERATION))
{
monitoringTupleKeyedStream =
kinesisStream.keyBy("deployment", "gameId", "eventName",
"component", "instance");
} else if
(keyOperationType.equalsIgnoreCase(Utils.COMPONENT_KEY_OPERATION))
{
TypeInformation> info = TypeInformation.of(new
TypeHint>(){});
monitoringTupleKeyedStream =
kinesisStream.keyBy(new KeySelector() {
public Tuple getKey(Monitoring mon) throws
Exception {
String key = "";
String keyName = "";
final String eventName = mon.getEventName();
if (eventName != null &&
((eventName.equalsIgnoreCase(INGRESS_FPS)))
)) {
keyName = PCAM_ID;
key = mon.getEventDataMap() != null ?
(String) mon.getEventDataMap().get(PCAM_ID) : "";
} else if (eventName != null &&
(eventName.equalsIgnoreCase(EGRESS_FPS))) {
keyName = OUT_BITRATE;
key = mon.getEventDataMap() != null ?
(String) mon.getEventDataMap().get(OUT_BITRATE) : "";
//TODO: identify key to use
}
mon.setKeyName(keyName);
mon.setKeyValue(key);
return new Tuple6<>(mon.getDeployment(),
mon.getGameId(), eventName, mon.getComponent(),
mon.getKeyName(), mon.getKeyValue());
}
}); //, info)
} else if
(keyOperationType.equalsIgnoreCase(COMPONENT_CONTAINER_OPERATION))
{
monitoringTupleKeyedStream =
kinesisStream.keyBy("deployment", "gameId", "eventName",
"component", "instance", "container"); //<== this is also
a Tuple6 but no complaints ?
}
}



This example below needs monitoringTupleKeyedStream  to be
KeyedStream>

TypeInformation> info = TypeInformation.of(new
TypeHint>(){});
monitoringTupleKeyedStream = kinesisStream.keyBy(new
KeySelector>() {
@Override
public Tuple6 

Re: print() method does not always print on the taskmanager.out file

2019-04-05 Thread Chesnay Schepler
This kind of sounds like a Outputstream flushing issue. Try calling 
"System.out.flush()" now and then in your sink and report back.


On 04/04/2019 18:04, Felipe Gutierrez wrote:

Hello,

I am studying the parallelism of tasks on DataStream. So, I have 
configured Flink to execute on my machine (master node) and one 
virtual machine (worker node).  The master has 4 cores 
(taskmanager.numberOfTaskSlots: 4) and the worker only 2 cores 
(taskmanager.numberOfTaskSlots: 2). I don't need to set this on the 
'conf/flink-conf.yaml', this was just to ensure that I am relating the 
properties with the right concepts.


When I create a application with parallelism of 1, 2, or 4, sometimes 
I can see the output of the "print()" method, other times no. I checke 
the output files of the task managers 
("flink-flink-taskexecutor-0-master.out" or 
"flink-flink-taskexecutor-0-worker.out") and I cancel the job and 
start it again. All of sudden I can see the output on the .out file.
I was thinking that it was because I am creating a job with more 
parallelism that the cluster supports, but this behavior also happens 
when I set the parallelism of my job to less than the slots available.


I guess if I see on the Flink dashboar X Task slots available and when 
I deploy my Job, the Job is running and the slots available decreased 
according to the number of parallelims of my Job, everything should be 
correct, doesn't it? I also created a Dummy Sink just to print the 
output, but the behavior is the same.


Here is my code: 
https://github.com/felipegutierrez/explore-flink/blob/master/src/main/java/org/sense/flink/examples/stream/MqttSensorRandomPartitionByKeyDAG.java#L48


Thanks,
Felipe
*--*
*-- Felipe Gutierrez*
*-- skype: felipe.o.gutierrez
*
*--_https://felipeogutierrez.blogspot.com_*





Flink on Mesos

2019-04-05 Thread Juan Gentile
Hello!

We are having a small problem while trying to deploy Flink on Mesos using 
marathon. In our set up of Mesos we are required to specify the amount of disk 
space we want to have for the applications we deploy there.
The current default value in Flink is 0 and it’s currently is not 
parameterizable. This means that we ask 0 disk space for our instances so Flink 
can’t work.
I’d appreciate suggestions if you have any. Otherwise and since this is causing 
some problems on our side, I’d like to know if I can create a ticket on Flink 
and work on it; looks like the fix should be quite easy to implement.

Thank you,
Juan.


Default Kafka producers pool size for FlinkKafkaProducer.Semantic.EXACTLY_ONCE

2019-04-05 Thread min.tan
Hi,

 

I keep getting exceptions
"org.apache.flink.streaming.connectors.kafka.FlinkKafkaException: Too many
ongoing snapshots. Increase kafka producers pool size or decrease number of
concurrent checkpoints."

 

I think that DEFAULT_KAFKA_PRODUCERS_POOL_SIZE is 5 and need to increase
this size. What considerations should I take to increase this size?

 

I have a check point setting like this and run a parallelism of 16 and have
a check point setting like this

 

public static void setup(StreamExecutionEnvironment env) {
env.enableCheckpointing(2_000);
 
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONC
E);
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(1_000);
env.getCheckpointConfig().setCheckpointTimeout(10_000);
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
env.setStateBackend(new MemoryStateBackend(Integer.MAX_VALUE));
 
//env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.E
xternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
}

 

Regards,

 

Min



smime.p7s
Description: S/MIME cryptographic signature