Re: Flink 1.4: Queryable State Client

2018-10-13 Thread vino yang
Hi Seye,

It seems that you have conducted an in-depth analysis of this issue.
If you think it's a bug or need improvement. Please feel free to create a
JIRA issue to track its status.

Thanks, vino.

Seye Jin  于2018年10月14日周日 上午12:02写道:

> I recently upgraded to flink 1.4 from 1.3 and leverage Queryable State
> client in my application. I have 1 jm and 5 tm all serviced behind
> kubernetes. A large state is built and distributed evenly across task
> mangers and the client can query state for specified key
>
> Issue: if a task manager dies and a new one gets spun up(automatically)
> and the QS states successfully recover in new nodes/task slots. I start to
> get time out exception when the client tries to query for key, even if I
> try to reset or re-deploy the client jobs
>
> I have been trying to triage this and figure out a way to remediate this
> issue and I found that in KvStateClientProxyHandler which is not exposed in
> code, there is a forceUpdate flag that can help reset KvStateLocations(plus
> inetAddresses) but the default is false and can't be overriden
>
> I was wandering if anyone knows how to remediate this kind of issue or if
> there is a way to have the jobmanager know that the task manager location
> in cache is no more valid.
>
> Any tip to resolve this will be appreciated (I can't downgrade back to 1.3
> or upgrade from 1.4)
>
>


Re: When does Trigger.clear() get called?

2018-10-13 Thread Averell
Hello Hequn,

Thanks for the answers.
Regarding question no.2, I am now clear.
Regarding question no.1, does your answer apply to those custom states as
well? This concern of mine came from Flink's implementation of CountTrigger,
in which a custom state is being cleared explicitly in Trigger.clear():

/   public void clear(W window, TriggerContext ctx) throws Exception {
ctx.getPartitionedState(stateDesc).clear();
}
/

My 3rd question was for ordinary, non-windowed keyed streams, where I don't
see in Flink's document any mention of using Trigger, so how can I clear
those streams?

Thank you very much for your help.
Regards,
Averell
 



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: When does Trigger.clear() get called?

2018-10-13 Thread Hequn Cheng
Hi Averell,

> 1. Neither PURGE nor clear() removes the States (so the States must be
explicitly cleared by the user).
Both PURGE and clear() remove state. The PURGE action removes the window
state, i.e. the aggregate value. The clear() removes the window meta data
including state in Trigger.

> 2. When an event for a window arrives after PURGE has been called, it is
still be processed, and is treated as the first event of that window.
In most cases, the answer is yes. However, there is a chance that the event
is not treated as the first one by the trigger, since PURGE clears the
window state but the window meta data including the Trigger remain.

>  if I know that some keys would never have new events anymore,
should/could I remove those streams corresponding to those keys
Yes. I think we can return FIRE_AND_PURGE.

Best, Hequn



On Sun, Oct 14, 2018 at 7:30 AM Averell  wrote:

> Hello Fabian,
>
> So could I assume the followings?
>
> 1. Neither PURGE nor clear() removes the States (so the States must be
> explicitly cleared by the user).
> 2. When an event for a window arrives after PURGE has been called, it is
> still be processed, and is treated as the first event of that window.
>
> And one related question: for keyed streams, if I know that some keys would
> never have new events anymore, should/could I remove those streams
> corresponding to those keys so that I can save some memory allocated to the
> metadata?
>
> Thanks and best regards,
> Averell
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


Re: Questions in sink exactly once implementation

2018-10-13 Thread Hequn Cheng
Hi Henry,

> 1. I have heard a idempotent way but I do not know how to implement it,
would you please enlighten me about it by a example?
It's a property of the result data. For example, you can overwrite old
values with new ones using a primary key.

> 2. If dirty data are *added* but not updated
This against idempotent. Idempotent ensure that the result is consistent in
the end.

> 3. If using two-phase commit, the sink must support transaction.
I think the answer is yes.

Best, Hequn


On Sat, Oct 13, 2018 at 8:49 PM 徐涛  wrote:

> Hi Hequn,
> Thanks a lot for your response. I have a few questions about this topic.
> Would you please help me about it?
> 1. I have heard a idempotent way but I do not know how to implement it,
> would you please enlighten me about it by a example?
> 2. If dirty data are *added* but not updated, then only overwrite is not
> enough I think.
> 3. If using two-phase commit, the sink must support transaction.
> 3.1 If the sink does not support transaction, for example, elasticsearch,
> do I *have to* use idempotent to implement exactly-once?
> 3.2 If the sink support transaction, for example, mysql, idempotent and
> two-phase commit is both OK. But like you say, if there are a lot of items
> between checkpoints, the batch insert is a heavy action, I still have to
> use idempotent way to implement exactly-once.
>
>
> Best
> Hequn
>
> 在 2018年10月13日,上午11:43,Hequn Cheng  写道:
>
> Hi Henry,
>
> Yes, exactly once using atomic way is heavy for mysql. However, you don't
> have to buffer data if you choose option 2. You can simply overwrite old
> records with new ones if result data is idempotent and this way can also
> achieve exactly once.
> There is a document about End-to-End Exactly-Once Processing in Apache
> Flink[1], which may be helpful for you.
>
> Best, Hequn
>
> [1]
> https://flink.apache.org/features/2018/03/01/end-to-end-exactly-once-apache-flink.html
>
>
>
> On Fri, Oct 12, 2018 at 5:21 PM 徐涛  wrote:
>
>> Hi
>> I am reading the book “Introduction to Apache Flink”, and in the
>> book there mentions two ways to achieve sink exactly once:
>> 1. The first way is to buffer all output at the sink and commit
>> this atomically when the sink receives a checkpoint record.
>> 2. The second way is to eagerly write data to the output, keeping
>> in mind that some of this data might be “dirty” and replayed after a
>> failure. If there is a failure, then we need to roll back the output, thus
>> overwriting the dirty data and effectively deleting dirty data that has
>> already been written to the output.
>>
>> I read the code of Elasticsearch sink, and find there is a
>> flushOnCheckpoint option, if set to true, the change will accumulate until
>> checkpoint is made. I guess it will guarantee at-least-once delivery,
>> because although it use batch flush, but the flush is not a atomic action,
>> so it can not guarantee exactly-once delivery.
>>
>> My question is :
>> 1. As many sinks do not support transaction, at this case I have
>> to choose 2 to achieve exactly once. At this case, I have to buffer all the
>> records between checkpoints and delete them, it is a bit heavy action.
>> 2. I guess mysql sink should support exactly once delivery,
>> because it supports transaction, but at this case I have to execute batch
>> according to the number of actions between checkpoints but not a specific
>> number, 100 for example. When there are a lot of items between checkpoints,
>> it is a heavy action either.
>>
>> Best
>> Henry
>
>
>


Re: [DISCUSS] Integrate Flink SQL well with Hive ecosystem

2018-10-13 Thread Shuyi Chen
Welcome to the community and thanks for the great proposal, Xuefu! I think
the proposal can be divided into 2 stages: making Flink to support Hive
features, and make Hive to work with Flink. I agreed with Timo that on
starting with a smaller scope, so we can make progress faster. As for [6],
a proposal for DDL is already in progress, and will come after the unified
SQL connector API is done. For supporting Hive syntax, we might need to
work with the Calcite community, and a recent effort called babel (
https://issues.apache.org/jira/browse/CALCITE-2280) in Calcite might help
here.

Thanks
Shuyi

On Wed, Oct 10, 2018 at 8:02 PM Zhang, Xuefu 
wrote:

> Hi Fabian/Vno,
>
> Thank you very much for your encouragement inquiry. Sorry that I didn't
> see Fabian's email until I read Vino's response just now. (Somehow Fabian's
> went to the spam folder.)
>
> My proposal contains long-term and short-terms goals. Nevertheless, the
> effort will focus on the following areas, including Fabian's list:
>
> 1. Hive metastore connectivity - This covers both read/write access, which
> means Flink can make full use of Hive's metastore as its catalog (at least
> for the batch but can extend for streaming as well).
> 2. Metadata compatibility - Objects (databases, tables, partitions, etc)
> created by Hive can be understood by Flink and the reverse direction is
> true also.
> 3. Data compatibility - Similar to #2, data produced by Hive can be
> consumed by Flink and vise versa.
> 4. Support Hive UDFs - For all Hive's native udfs, Flink either provides
> its own implementation or make Hive's implementation work in Flink.
> Further, for user created UDFs in Hive, Flink SQL should provide a
> mechanism allowing user to import them into Flink without any code change
> required.
> 5. Data types -  Flink SQL should support all data types that are
> available in Hive.
> 6. SQL Language - Flink SQL should support SQL standard (such as SQL2003)
> with extension to support Hive's syntax and language features, around DDL,
> DML, and SELECT queries.
> 7.  SQL CLI - this is currently developing in Flink but more effort is
> needed.
> 8. Server - provide a server that's compatible with Hive's HiverServer2 in
> thrift APIs, such that HiveServer2 users can reuse their existing client
> (such as beeline) but connect to Flink's thrift server instead.
> 9. JDBC/ODBC drivers - Flink may provide its own JDBC/ODBC drivers for
> other application to use to connect to its thrift server
> 10. Support other user's customizations in Hive, such as Hive Serdes,
> storage handlers, etc.
> 11. Better task failure tolerance and task scheduling at Flink runtime.
>
> As you can see, achieving all those requires significant effort and across
> all layers in Flink. However, a short-term goal could  include only core
> areas (such as 1, 2, 4, 5, 6, 7) or start  at a smaller scope (such as #3,
> #6).
>
> Please share your further thoughts. If we generally agree that this is the
> right direction, I could come up with a formal proposal quickly and then we
> can follow up with broader discussions.
>
> Thanks,
> Xuefu
>
>
>
> --
> Sender:vino yang 
> Sent at:2018 Oct 11 (Thu) 09:45
> Recipient:Fabian Hueske 
> Cc:dev ; Xuefu ; user <
> user@flink.apache.org>
> Subject:Re: [DISCUSS] Integrate Flink SQL well with Hive ecosystem
>
> Hi Xuefu,
>
> Appreciate this proposal, and like Fabian, it would look better if you can
> give more details of the plan.
>
> Thanks, vino.
>
> Fabian Hueske  于2018年10月10日周三 下午5:27写道:
> Hi Xuefu,
>
> Welcome to the Flink community and thanks for starting this discussion!
> Better Hive integration would be really great!
> Can you go into details of what you are proposing? I can think of a couple
> ways to improve Flink in that regard:
>
> * Support for Hive UDFs
> * Support for Hive metadata catalog
> * Support for HiveQL syntax
> * ???
>
> Best, Fabian
>
> Am Di., 9. Okt. 2018 um 19:22 Uhr schrieb Zhang, Xuefu <
> xuef...@alibaba-inc.com>:
> Hi all,
>
> Along with the community's effort, inside Alibaba we have explored Flink's
> potential as an execution engine not just for stream processing but also
> for batch processing. We are encouraged by our findings and have initiated
> our effort to make Flink's SQL capabilities full-fledged. When comparing
> what's available in Flink to the offerings from competitive data processing
> engines, we identified a major gap in Flink: a well integration with Hive
> ecosystem. This is crucial to the success of Flink SQL and batch due to the
> well-established data ecosystem around Hive. Therefore, we have done some
> initial work along this direction but there are still a lot of effort
> needed.
>
> We have two strategies in mind. The first one is to make Flink SQL
> full-fledged and well-integrated with Hive ecosystem. This is a similar
> approach to what Spark SQL adopted. The second strategy is to make Hive
> itself work with Flink, similar 

Re: When does Trigger.clear() get called?

2018-10-13 Thread Averell
Hello Fabian,

So could I assume the followings?

1. Neither PURGE nor clear() removes the States (so the States must be
explicitly cleared by the user).
2. When an event for a window arrives after PURGE has been called, it is
still be processed, and is treated as the first event of that window.

And one related question: for keyed streams, if I know that some keys would
never have new events anymore, should/could I remove those streams
corresponding to those keys so that I can save some memory allocated to the
metadata?

Thanks and best regards,
Averell



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Identifying missing events in keyed streams

2018-10-13 Thread Averell
Thank you Fabian.

Tried (2), and it's working well.
I found one more benefit of (2) over (3) is that it allow me to easily raise
multiple levels of alarms for each keyed stream (i.e: minor: missed 2
cycles, major: missed 5 cycles,...)

Thanks for your help.

Regards,
Averell



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Flink 1.4: Queryable State Client

2018-10-13 Thread Seye Jin
I recently upgraded to flink 1.4 from 1.3 and leverage Queryable State
client in my application. I have 1 jm and 5 tm all serviced behind
kubernetes. A large state is built and distributed evenly across task
mangers and the client can query state for specified key

Issue: if a task manager dies and a new one gets spun up(automatically) and
the QS states successfully recover in new nodes/task slots. I start to get
time out exception when the client tries to query for key, even if I try to
reset or re-deploy the client jobs

I have been trying to triage this and figure out a way to remediate this
issue and I found that in KvStateClientProxyHandler which is not exposed in
code, there is a forceUpdate flag that can help reset KvStateLocations(plus
inetAddresses) but the default is false and can't be overriden

I was wandering if anyone knows how to remediate this kind of issue or if
there is a way to have the jobmanager know that the task manager location
in cache is no more valid.

Any tip to resolve this will be appreciated (I can't downgrade back to 1.3
or upgrade from 1.4)


Re: [DISCUSS] Integrate Flink SQL well with Hive ecosystem

2018-10-13 Thread Bowen
Thank you Xuefu, for bringing up this awesome, detailed proposal! It will 
resolve lots of existing pain for users like me.

In general, I totally agree that improving FlinkSQL's completeness would be a 
much better start point than building 'Hive on Flink', as the Hive community is 
concerned about Flink's SQL incompleteness and lack of proven batch performance 
shown in https://issues.apache.org/jira/browse/HIVE-10712. Improving FlinkSQL 
seems a more natural direction to start with in order to achieve the 
integration.

Xuefu and Timo has laid a quite clear path of what to tackle next. Given that 
there're already some efforts going on, for item 1,2,5,3,4,6 in Xuefu's list, 
shall we:
identify gaps between a) Xuefu's proposal/discussion result in this thread and 
b) all the ongoing work/discussions?
then, create some new top-level JIRA tickets to keep track of and start more 
detailed discussions with?
It's gonna be a great and influential project , and I'd love to participate 
into it to move FlinkSQL's adoption and ecosystem even further.

Thanks,
Bowen


> 在 2018年10月12日,下午3:37,Jörn Franke  写道:
> 
> Thank you very nice , I fully agree with that. 
> 
>> Am 11.10.2018 um 19:31 schrieb Zhang, Xuefu :
>> 
>> Hi Jörn,
>> 
>> Thanks for your feedback. Yes, I think Hive on Flink makes sense and in fact 
>> it is one of the two approaches that I named in the beginning of the thread. 
>> As also pointed out there, this isn't mutually exclusive from work we 
>> proposed inside Flink and they target at different user groups and user 
>> cases. Further, what we proposed to do in Flink should be a good showcase 
>> that demonstrate Flink's capabilities in batch processing and convince Hive 
>> community of the worth of a new engine. As you might know, the idea 
>> encountered some doubt and resistance. Nevertheless, we do have a solid plan 
>> for Hive on Flink, which we will execute once Flink SQL is in a good shape.
>> 
>> I also agree with you that Flink SQL shouldn't be closely coupled with Hive. 
>> While we mentioned Hive in many of the proposed items, most of them are 
>> coupled only in concepts and functionality rather than code or libraries. We 
>> are taking the advantage of the connector framework in Flink. The only thing 
>> that might be exceptional is to support Hive built-in UDFs, which we may not 
>> make it work out of the box to avoid the coupling. We could, for example, 
>> require users bring in Hive library and register themselves. This is subject 
>> to further discussion.
>> 
>> #11 is about Flink runtime enhancement that is meant to make task failures 
>> more tolerable (so that the job don't have to start from the beginning in 
>> case of task failures) and to make task scheduling more resource-efficient. 
>> Flink's current design in those two aspects leans more to stream processing, 
>> which may not be good enough for batch processing. We will provide more 
>> detailed design when we get to them.
>> 
>> Please let me know if you have further thoughts or feedback.
>> 
>> Thanks,
>> Xuefu
>> 
>> 
>> --
>> Sender:Jörn Franke 
>> Sent at:2018 Oct 11 (Thu) 13:54
>> Recipient:Xuefu 
>> Cc:vino yang ; Fabian Hueske ; dev 
>> ; user 
>> Subject:Re: [DISCUSS] Integrate Flink SQL well with Hive ecosystem
>> 
>> Would it maybe make sense to provide Flink as an engine on Hive 
>> („flink-on-Hive“)? Eg to address 4,5,6,8,9,10. this could be more loosely 
>> coupled than integrating hive in all possible flink core modules and thus 
>> introducing a very tight dependency to Hive in the core.
>> 1,2,3 could be achieved via a connector based on the Flink Table API.
>> Just as a proposal to start this Endeavour as independent projects (hive 
>> engine, connector) to avoid too tight coupling with Flink. Maybe in a more 
>> distant future if the Hive integration is heavily demanded one could then 
>> integrate it more tightly if needed. 
>> 
>> What is meant by 11?
>> Am 11.10.2018 um 05:01 schrieb Zhang, Xuefu :
>> 
>> Hi Fabian/Vno,
>> 
>> Thank you very much for your encouragement inquiry. Sorry that I didn't see 
>> Fabian's email until I read Vino's response just now. (Somehow Fabian's went 
>> to the spam folder.)
>> 
>> My proposal contains long-term and short-terms goals. Nevertheless, the 
>> effort will focus on the following areas, including Fabian's list:
>> 
>> 1. Hive metastore connectivity - This covers both read/write access, which 
>> means Flink can make full use of Hive's metastore as its catalog (at least 
>> for the batch but can extend for streaming as well).
>> 2. Metadata compatibility - Objects (databases, tables, partitions, etc) 
>> created by Hive can be understood by Flink and the reverse direction is true 
>> also.
>> 3. Data compatibility - Similar to #2, data produced by Hive can be consumed 
>> by Flink and vise versa.
>> 4. Support Hive UDFs - For all Hive's native udfs, Flink either provides its 
>> own impl

Re: FlinkKafkaProducer and Confluent Schema Registry

2018-10-13 Thread Olga Luganska
Any suggestions?

Thank you

Sent from my iPhone

On Oct 9, 2018, at 9:28 PM, Olga Luganska 
mailto:trebl...@hotmail.com>> wrote:

Hello,

I would like to use Confluent Schema Registry in my streaming job.
I was able to make it work with the help of generic Kafka producer and 
FlinkKafkaConsumer which is using ConfluentRegistryAvroDeserializationSchema.


FlinkKafkaConsumer011 consumer = new 
FlinkKafkaConsumer011<>(MY_TOPIC,

ConfluentRegistryAvroDeserializationSchema.forGeneric(schema, SCHEMA_URI), 
kafkaProperties);


My question: is it possible to implement producer logic in the 
FlinkKafkaProducer to serialize message and store schema id in the Confluent 
Schema registry?


I don't think this is going to work with the current interface because creation 
and caching of the schema id in the Confluent Schema Registry is done with the 
help of io.confluent.kafka.serializers.KafkaAvroSerializer.class  and all 
FlinkKafkaProducer constructors have either SerializationSchema or 
KeyedSerializationSchema (part of Flink's own serialization stack) as one of 
the parameters.
If my assumption is wrong, could you please provide details of implementation?
Thank you very much,
Olga





​






Re: Questions in sink exactly once implementation

2018-10-13 Thread 徐涛
Hi Hequn,
Thanks a lot for your response. I have a few questions about this 
topic. Would you please help me about it?
1. I have heard a idempotent way but I do not know how to implement it, 
would you please enlighten me about it by a example?
2. If dirty data are added but not updated, then only overwrite is not 
enough I think.
3. If using two-phase commit, the sink must support transaction.
3.1 If the sink does not support transaction, for example, 
elasticsearch, do I have to use idempotent to implement exactly-once?
3.2 If the sink support transaction, for example, mysql, 
idempotent and two-phase commit is both OK. But like you say, if there are a 
lot of items between checkpoints, the batch insert is a heavy action, I still 
have to use idempotent way to implement exactly-once.


Best
Hequn

> 在 2018年10月13日,上午11:43,Hequn Cheng  写道:
> 
> Hi Henry,
> 
> Yes, exactly once using atomic way is heavy for mysql. However, you don't 
> have to buffer data if you choose option 2. You can simply overwrite old 
> records with new ones if result data is idempotent and this way can also 
> achieve exactly once. 
> There is a document about End-to-End Exactly-Once Processing in Apache 
> Flink[1], which may be helpful for you.
> 
> Best, Hequn
> 
> [1] 
> https://flink.apache.org/features/2018/03/01/end-to-end-exactly-once-apache-flink.html
>  
> 
> 
> 
> 
> On Fri, Oct 12, 2018 at 5:21 PM 徐涛  > wrote:
> Hi 
> I am reading the book “Introduction to Apache Flink”, and in the book 
> there mentions two ways to achieve sink exactly once:
> 1. The first way is to buffer all output at the sink and commit this 
> atomically when the sink receives a checkpoint record.
> 2. The second way is to eagerly write data to the output, keeping in 
> mind that some of this data might be “dirty” and replayed after a failure. If 
> there is a failure, then we need to roll back the output, thus overwriting 
> the dirty data and effectively deleting dirty data that has already been 
> written to the output.
> 
> I read the code of Elasticsearch sink, and find there is a 
> flushOnCheckpoint option, if set to true, the change will accumulate until 
> checkpoint is made. I guess it will guarantee at-least-once delivery, because 
> although it use batch flush, but the flush is not a atomic action, so it can 
> not guarantee exactly-once delivery. 
> 
> My question is : 
> 1. As many sinks do not support transaction, at this case I have to 
> choose 2 to achieve exactly once. At this case, I have to buffer all the 
> records between checkpoints and delete them, it is a bit heavy action.
> 2. I guess mysql sink should support exactly once delivery, because 
> it supports transaction, but at this case I have to execute batch according 
> to the number of actions between checkpoints but not a specific number, 100 
> for example. When there are a lot of items between checkpoints, it is a heavy 
> action either.
> 
> Best
> Henry



Re: Are savepoints / checkpoints co-ordinated?

2018-10-13 Thread vino yang
Hi Anand,

About "Cancel with savepoint" congxian is right.

And for the duplicates, You should use kafka producer transaction (since
0.11) provided EXACTLY_ONCE semantic[1].

Thanks, vino.

[1]:
https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/connectors/kafka.html#kafka-011


Congxian Qiu  于2018年10月12日周五 下午7:55写道:

> AFAIK, "Cancel Job with Savepoint" will stop checkpointScheduler -->
> trigger a savepoint, then cancel your job. there will no more checkpoints.
>
>  于2018年10月12日周五 上午1:30写道:
>
>> Hi,
>>
>>
>>
>> I had a couple questions about savepoints / checkpoints
>>
>>
>>
>> When I issue "Cancel Job with Savepoint", how is that instruction
>> co-ordinated with check points? Am I certain the savepoint will be the last
>> operation (i.e. no more check points)?
>>
>>
>>
>> I have a kafka src>operation>kafka sink task in flink. And it looks like
>> on restart from the savepoint there are duplicates written to the sink
>> topic in kafka. The dupes overlap with the last few events prior to save
>> point, and I am trying to work out what could have happened.
>>
>> My FlinkKafkaProducer011  is set to Semantic.AT_LEAST_ONCE, but
>> env.enableCheckpointing(parameters.getInt("checkpoint.interval"),
>> CheckpointingMode.EXACTLY_ONCE).
>>
>> I thought at least once still implies flushes to kafka still only occur
>> with a checkpoint.
>>
>>
>>
>> One  theory is a further checkpoint occurred after/ during the savepoint
>> - which would have flushed events to kafka that are not in my savepoint.
>>
>>
>>
>> Any pointers to schoolboy errors I may have made would be appreciated.
>>
>>
>>
>> -
>>
>> Also  am I right in thinking if I have managed state with rocksdb back
>> end that is using 1G on disk, but substantially less keyed state in memory,
>> a savepoint needs to save the full 1G to complete?
>>
>>
>>
>> Regards
>>
>> Anand
>>
>
>
> --
> Blog:http://www.klion26.com
> GTalk:qcx978132955
> 一切随心
>


Re: Not all files are processed? Stream source with ContinuousFileMonitoringFunction

2018-10-13 Thread Juan Miguel Cejuela
I’m using both a local (Unix) file system and hdfs.

I’m going to check those to get ideas, thank you!

I’m also checking the internal code of the class and my own older patch
code.
On Fri 12. Oct 2018 at 21:32, Fabian Hueske  wrote:

> Hi,
>
> Which file system are you reading from? If you are reading from S3, this
> might be cause by S3's eventual consistency property.
> Have a look at FLINK-9940 [1] for a more detailed discussion.
> There is also an open PR [2], that you could try to patch the source
> operator with.
>
> Best, Fabian
>
> [1] https://issues.apache.org/jira/browse/FLINK-9940
> [2] https://github.com/apache/flink/pull/6613
>
> Am Fr., 12. Okt. 2018 um 20:41 Uhr schrieb Juan Miguel Cejuela <
> jua...@tagtog.net>:
>
>> Dear flinksters,
>>
>>
>> I'm using the class `ContinuousFileMonitoringFunction` as a source to
>> monitor a folder for new incoming files.* I have the problem that not
>> all the files that are sent to the folder get processed / triggered by the
>> function*. Specific details of my workflow is that I send up to 1k files
>> per minute, and that I consume the stream as a `AsyncDataStream`.
>>
>> I myself raised an unrelated issue with the
>> `ContinuousFileMonitoringFunction` class some time ago (
>> https://issues.apache.org/jira/browse/FLINK-8046): if two or more files
>> shared the very same timestamp, only the first one (non-deterministically
>> chosen) would be processed. However, I patched the file myself to fix that
>> problem by using a LinkedHashMap to remember which files had been really
>> processed before or not. My patch is working fine as far as I can tell.
>>
>> The problem seems to be rather that some files (when many are sent at
>> once to the same folder) do not even get triggered/activated/registered by
>> the class.
>>
>>
>> Am I properly explaining my problem?
>>
>>
>> Any hints to solve this challenge would be greatly appreciated ! ❤ THANK
>> YOU
>>
>> --
>> Juanmi, CEO and co-founder @ 🍃tagtog.net
>>
>> Follow tagtog updates on 🐦 Twitter: @tagtog_net
>> 
>>
>> --
Juanmi, CEO and co-founder @ 🍃tagtog.net

Follow tagtog updates on 🐦 Twitter: @tagtog_net



Re: Mapstatedescriptor

2018-10-13 Thread Dominik Wosiński
Hey,
It's the name for the whole descriptor. Not the keys, it means that no
other descriptor should be created with the same name.

Best Regards,
Dom.

Sob., 13.10.2018, 09:50 użytkownik Szymon  napisał:

>
>
> Hi, i have a question about MapStateDescriptor used to create MapState.
> I have a keyed stream and ProcessWindowFunction where I want to use
> MapState. And the question is that in MapStateDescriptor constructor
>
> public MapStateDescriptor(String name, Class keyClass, Class
> valueClass)
>
> the "name" must be unique for each key or this is only the
> name/description.
>
> Best Regards
> Szymon
>
>
>
>
>


Mapstatedescriptor

2018-10-13 Thread Szymon
Hi, i have a question about MapStateDescriptor used to create MapState. I have 
a keyed stream and ProcessWindowFunction where I want to use MapState. And the 
question is that in MapStateDescriptor constructor public 
MapStateDescriptor(String name,
  Class keyClass,
  Class valueClass)the "name" must be unique 
for each key or this is only the name/description.Best RegardsSzymon