Re: checkpoints/.../shared cleanup

2021-08-30 Thread Khachatryan Roman
Hi,

I think the documentation is correct. Once the job is stopped with
savepoint, any of its "regular" checkpoints are discarded, and as a
result any shared state gets unreferenced and is also discarded.
Savepoints currently do not have shared state.

Furthermore, the new job should have a new ID and therefore a new folder.
Are you referring to the old folders?

However, the removal process is asynchronous and the client doesn't
wait for all the artifacts to be removed.
Then the cluster will wait for removal to complete before termination.
Are you running Flink in session mode?

Regards,
Roman

On Fri, Aug 27, 2021 at 8:05 AM Alexey Trenikhun  wrote:
>
> "the shared subfolder still grows" - while upgrading job, we cancel job with 
> savepoint, my expectations that Flink will clean checkpoint  including shared 
> directory, since checkpoints are not reatained, then we start upgraded job 
> from savepoint, however when I look into shared folder I see older files from 
> previous version of job. This upgrade process repeated again, as result the 
> shared subfolder grows and grows
>
> Thanks,
> Alexey
> 
> From: Alexey Trenikhun 
> Sent: Thursday, August 26, 2021 6:37:27 PM
> To: Matthias Pohl 
> Cc: Flink User Mail List ; sjwies...@gmail.com 
> 
> Subject: Re: checkpoints/.../shared cleanup
>
> Hi Matthias,
>
> I don't use externalized checkpoints (from Flink UI Persist Checkpoints 
> Externally: Disabled), why do you think checkpoint(s) should be retained? It 
> kind of contradicts with documentation [1] - Checkpoints are by default not 
> retained and are only used to resume a job from failures.
>
> [1] - 
> https://ci.apache.org/projects/flink/flink-docs-master/docs/ops/state/checkpoints/#retained-checkpoints
> Checkpoints | Apache Flink
> Checkpoints # Overview # Checkpoints make state in Flink fault tolerant by 
> allowing state and the corresponding stream positions to be recovered, 
> thereby giving the application the same semantics as a failure-free 
> execution. See Checkpointing for how to enable and configure checkpoints for 
> your program. Checkpoint Storage # When checkpointing is enabled, managed 
> state is persisted to ensure ...
> ci.apache.org
>
> Thanks,
> Alexey
> 
> From: Matthias Pohl 
> Sent: Thursday, August 26, 2021 5:42 AM
> To: Alexey Trenikhun 
> Cc: Flink User Mail List ; sjwies...@gmail.com 
> 
> Subject: Re: checkpoints/.../shared cleanup
>
> Hi Alexey,
> thanks for reaching out to the community. I have a question: What do you mean 
> by "the shared subfolder still grows"? As far as I understand, the shared 
> folder contains the state of incremental checkpoints. If you cancel the 
> corresponding job and start a new job from one of the retained incremental 
> checkpoints, it is required for the shared folder of the previous job to be 
> still around since it contains the state. The new job would then create its 
> own shared subfolder. Any new incremental checkpoints will write their state 
> into the new job's shared subfolder while still relying on shared state of 
> the previous job for older data. The RocksDB Backend is in charge of 
> consolidating the incremental state.
>
> Hence, you should be careful with removing the shared folder in case you're 
> planning to restart the job later on.
>
> I'm adding Seth to this thread. He might have more insights and/or correct my 
> limited knowledge of the incremental checkpoint process.
>
> Best,
> Matthias
>
> On Wed, Aug 25, 2021 at 1:39 AM Alexey Trenikhun  wrote:
>
> Hello,
> I use incremental checkpoints, not externalized, should content of 
> checkpoint/.../shared be removed when I cancel job  (or cancel with 
> savepoint). Looks like in our case shared continutes to grow...
>
> Thanks,
> Alexey


Re: Flink custom trigger use case

2021-02-23 Thread Khachatryan Roman
Hi Diwakar,

I'm not sure I fully understand your question.
If event handling in one window depends on some other windows than
TriggerContext.getPartitionedState can not be used. Triggers don't have
access to the global state (only to key-window scoped state).
If that's what you want then please consider ProcessWindowFunction [1]
where you can use context.globalState() in your process function.

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#processwindowfunction

Regards,
Roman


On Tue, Feb 23, 2021 at 3:29 AM Diwakar Jha  wrote:

>
> Hello,
>
> I'm trying to use a custom trigger for one of my use case. I have a basic
> logic (as shown below) of using keyBy on the input stream and using a
> window of 1 min.
>
> .keyBy()
> .window(TumblingEventTimeWindows.of(Time.seconds(60)))
> .trigger(new CustomTrigger())
> .aggregate(Input.getAggregationFunction(), new
> AggregationProcessingWindow());
>
>
> My custom trigger is expected to fire the first event of the keyBy
> instantly and any subsequent events should be aggregated in the window.
>
> .trigger(new Trigger() {
>> @Override
>> public TriggerResult onElement(Record record, long l, TimeWindow
>> timeWindow, TriggerContext triggerContext) throws Exception {
>> ValueState firstSeen =
>> triggerContext.getPartitionedState(firstSceenDescriptor);
>> if(firstSeen.value() == null) {
>> firstSeen.update(true);
>> // fire trigger to early evaluate window and purge that event.
>> return TriggerResult.FIRE_AND_PURGE;
>> }
>> // Continue. Do not evaluate window per element
>> return TriggerResult.CONTINUE;
>> }
>> @Override
>> public TriggerResult onProcessingTime(long l, TimeWindow timeWindow,
>> TriggerContext triggerContext) throws Exception {
>> // final evaluation and purge window state
>> return TriggerResult.FIRE_AND_PURGE;
>> }
>> @Override
>> public TriggerResult onEventTime(long l, TimeWindow timeWindow,
>> TriggerContext triggerContext) throws Exception {
>> return TriggerResult.CONTINUE;
>> }
>> @Override
>> public void clear(TimeWindow timeWindow, TriggerContext triggerContext)
>> throws Exception {
>>
>> }
>> })
>
>
>
>
> Currently, I see (for each window and same key) the first event of the
> window is always fired. But I want to see this happening for only the first
> window and for the subsequent window it should aggregate all the events and
> then fire.
>
> Example : all the records have the same key.
> current output.
> record 1 : first event in the window-1 : fired record 2 : last event in
> the window-1 : fired record 3 : first event in the window-2 : fired record
> 4, record 5 : - 2 events in the window-2 : fired.
>
> expected output.
> record 1 : first event in the window-1 : fired record 2 : last event in
> the window-1 : fired record 3,4,5 : all event in the window-2 : fired
> window-2 should not fire the first event of the same key.
>
> I'm reading it here
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#fire-and-purge
> but not able to solve it. Any pointers would be helpful.
>
> Thanks.
>


Re: Julia API/Interface for Flink

2021-02-23 Thread Khachatryan Roman
Hi,

AFAIK there is no direct support for Julia in Flink currently.
However, you may try to call Python from Julia using either Statefun Python
SDK [1] or PyFlink [2]; or implement a remote Statefun module [3].

[1]
https://ci.apache.org/projects/flink/flink-statefun-docs-release-2.2/sdk/python.html
[2] https://ci.apache.org/projects/flink/flink-docs-stable/dev/python/
[3]
https://ci.apache.org/projects/flink/flink-statefun-docs-release-2.2/sdk/

Regards,
Roman


On Mon, Feb 22, 2021 at 8:49 PM Beni Bilme  wrote:

> Hello,
>
> Is there a julia api or interface for using flink?
>
> Thanks in advance for any response.
>
> Beni
>
>
>


Re: BroadcastState dropped when data deleted in Kafka

2021-02-23 Thread Khachatryan Roman
Hi,

Deletion of messages in Kafka shouldn't affect Flink state in general.
Probably, some operator in your pipeline is re-reading the topic
and overwrites the state, dropping what was deleted by Kafka.
Could you share the code?

Regards,
Roman


On Tue, Feb 23, 2021 at 7:12 AM bat man  wrote:

> Hi,
>
> I have 2 streams one event data and the other rules. I broadcast the rules
> stream and then key the data stream on event type. The connected stream is
> processed thereafter.
> We faced an issue where the rules data in the topic got deleted because of
> Kafka retention policy.
> Post this the existing rules data also got dropped in the broadcast state
> and the processing stopped.
>
> As per my understanding the rules which were present in broadcast state
> should still exist even if the data was deleted in Kafka as the rules dats
> was already processed and stored in state map.
>
> PS: I’m reusing the rules stream as broadcast later in processing as well.
> Could this be an issue?
>
> Thanks,
> Hemant
>


Re: Sliding Window Count: Tricky Edge Case / Count Zero Problem

2021-02-08 Thread Khachatryan Roman
Hi,

Probably another solution would be to register a timer
(using KeyedProcessFunction) once we see an element after keyBy. The timer
will fire in windowIntervalMs. Upon firing, it will emit a dummy element
which will be ignored (or subtracted) in the end.
Upon receiving each new element, the function will shift the timer
accordingly.

Regards,
Roman


On Mon, Feb 8, 2021 at 10:50 AM Jan Brusch 
wrote:

> Hi Yun,
>
> thanks for your reply.
>
> I do agree with your point about standard windows being for high level
> operations and the lower-level apis offering a rich toolset for most
> advanced use cases.
>
> I have tried to solve my problem with keyedProcessFunctions also but was
> not able to get it to work for two reasons:
>
> 1) I was not able to set up a combination of ValueState, Timers and
> Triggers that emulated a sliding window with a rising and falling count
> (including 0) good enough.
>
> 2) Memory Leak: States / Windows should be cleared after a certain time of
> being at count 0 in order to prevent an infinitely rising of ValueStates
> (that are not needed anymore)
>
>
> Can you maybe please elaborate in pseudocode how you would envision your
> solution?
>
>
> Best regards
>
> Jan
> On 08.02.21 05:31, Yun Gao wrote:
>
> Hi Jan,
>
> From my view, I think in Flink Window should be as a "high-level"
> operation for some kind
> of aggregation operation and if it could not satisfy the requirements, we
> could at least turn to
> using the "low-level" api by using KeyedProcessFunction[1].
>
> In this case, we could use a ValueState to store the current value for
> each key, and increment
> the value on each element. Then we could also register time for each key
> on receiving the first
> element for this key,  and in the onTimer callback, we could send the
> current state value, update
> the value to 0 and register another timer for this key after 30s.
>
> Best,
>  Yun
>
>
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/process_function.html#the-keyedprocessfunction
>
> --Original Mail --
> *Sender:*Jan Brusch 
> 
> *Send Date:*Sat Feb 6 23:44:00 2021
> *Recipients:*user  
> *Subject:*Sliding Window Count: Tricky Edge Case / Count Zero Problem
>
>> Hi,
>> I was recently working on a problem where we wanted to implement a
>> simple count on a sliding window, e.g. "how many messages of a certain
>> type were emitted by a certain type of sensor in the last n minutes".
>> Which sounds simple enough in theory:
>>
>> messageStream
>>  .keyBy(//EmitterType + MessageType)
>>  .assignWindow(SlidingProcessingTimeWindows.of(Time.minutes(n),
>> Time.seconds(30)))
>>  .map(_ => 1)
>>  .reduce((x,y) => x + y)
>>  .addSink(...)
>>
>> But there is a tricky edge case: The downstream systems will never know
>> when the count for a certain key goes back to 0, which is important for
>> our use case. The technical reason being that flink doesn't open a
>> window if there are no entries, i.e. a window with count 0 doesn't exist
>> in flink.
>>
>> We came up with the following solution for the time being:
>>
>> messageStream
>>  .keyBy(//EmitterType + MessageType)
>>  .window(GlobalWindows.create())
>>  .trigger(ContinuousEventTimeTrigger.of(Time.seconds(30)))
>>  .evictor(// CustomEvictor: Evict all messages older than n minutes
>> BEFORE processing the window)
>>  .process(// CustomCounter: Count all Messages in Window State);
>>  .addSink(...)
>>
>> In the case of zero messages in the last n minutes, all messages will be
>> evicted from the window and the process-function will get triggered one
>> last time on the now empty window, so we can produce a count of 0.
>>
>> I have two problems, though, with this solution:
>> 1) It is computationally inefficient for a simple count, as custom
>> process functions will always keep all messages in state. And, on every
>> trigger all elements will have to be touched twice: To compare the
>> timestamp and to count.
>> 2) It does seem like a very roundabout solution to a simple problem.
>>
>> So, I was wondering if there was a more efficient or "flink-like"
>> approach to this. Sorry for the long writeup, but I would love to hear
>> your takes.
>>
>>
>> Best regards
>> Jan
>>
>> --
>> neuland  – Büro für Informatik GmbH
>> Konsul-Smidt-Str. 8g, 28217 Bremen
>>
>> Telefon (0421) 380107 57
>> Fax (0421) 380107 99
>> https://www.neuland-bfi.de
>>
>> https://twitter.com/neuland
>> https://facebook.com/neulandbfi
>> https://xing.com/company/neulandbfi
>>
>>
>> Geschäftsführer: Thomas Gebauer, Jan Zander
>> Registergericht: Amtsgericht Bremen, HRB 23395 HB
>> USt-ID. DE 246585501
>
> --
> neuland  – Büro für Informatik GmbH
> Konsul-Smidt-Str. 8g, 28217 Bremen
>
> Telefon (0421) 380107 57
> Fax (0421) 380107 99https://www.neuland-bfi.de
> https://twitter.com/neulandhttps://facebook.com/neulandbfihttps://xing.com/company/neulandbfi
>
>
> Geschäftsführer: Thomas Gebauer, Jan 

Re: Connecting Pyflink DDL to Azure Eventhubs using Kafka connector

2021-02-08 Thread Khachatryan Roman
Hi,

Could you provide the exception stack trace?

Regards,
Roman


On Mon, Feb 8, 2021 at 3:46 PM joris.vanagtmaal <
joris.vanagtm...@wartsila.com> wrote:

> I'm trying to read data from my eventhub in Azure, but i end up with the
> Flink error message 'findAndCreateTableSource failed'
>
> using Flink 1.13-Snapshot
>
> source_ddl = f"""CREATE TABLE dms_source(
> x_value VARCHAR
>  ) WITH (
>   'connector.type' = 'Kafka',
>   'connector.version' = 'universal',
>   'connector.partition' = '0',
>   'connector.sasl.jaas.config'=
> 'org.apache.kafka.common.security.plain.PlainLoginModule required
> username="$ConnectionString"
> password="Endpoint=sb://**EVEN_HUB_NAME**.
> servicebus.windows.net/;SharedAccessKeyName=**KEY_
>
> NAME**;SharedAccessKey=***PRIMARY_KEY***;EntityPath=**EVENT_HUB_INSTANCE_NAME**";',
>   'connector.sasl.mechanism' = 'PLAIN',
>   'connector.security.protocol' = 'SASL_SSL',
>   'connector.properties.bootstrap.servers' =
> '**EVEN_HUB_NAME**.servicebus.windows.net:9093',
>   'connector.properties.group.id' = '$Default',
>   'connector.startup-mode' = 'latest-offset',
>   'format.type' = 'json')
> """
>
>  Any tips on how to debug this?
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


Re: Enabling allowNonRestoredState when resuming from a savepoint causes ClassNotFoundException

2021-02-08 Thread Khachatryan Roman
Hi,

I'm pulling Yun Tang who is familiar with StateBackends and RocksDB in
particular.

>From what I see, the 2nd snapshot (sp2) is built using the same set of
states obtained from the starting savepoint/checkpoint (sp1) to write its
metadata. This metadata includes serializers snapshots, including
PojoSerializer for your custom type. On restore, this metadata is read, and
POJO class itself is loaded.

I see the following ways to overcome this issue:
1. Use the State Processor API to create a new snapshot [1]
2. If the operator has only this state then changing uid (together with
allowNonRestoredState) should help
3. Probably just changing POJO to an empty class will suffice in your case?

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/libs/state_processor_api.html

Regards,
Roman


On Mon, Feb 8, 2021 at 4:31 PM Dongwon Kim  wrote:

> Hi 张静,
>
> Q1: By default, a savepoint restore will try to match all state
>> back to the restored job. `AllowNonRestoredState` cannot avoid
>> recovery all state from savepoint, but only skip match all of the
>> restore state back to the restored job. So `ClassNotFoundException `
>> could not be avoid.
>
> okay
>
>Q2: Not really. After you recover new job from the savepoint
>> (savepoint1)based on (1), you could do a new savepoint (savepoint2),
>> then remove the definition of the POJO type. then you can restore from
>> savepoint2.
>>
> I did it but it ends up with the same ClassNotFoundException :-(
>
> What I did exactly are
> (1) Trigger sp1 from v1
> (2) Start v2-1 (w/ the definition of the POJO but do not use it at all)
> from sp1
> (3) Trigger sp2 from v2-1
> (4) Start v2-2 (w/o the definition of the POJO)  from sp2
> (5) v2-2 failed with the same ClassNotFoundException regarding the POJO
> type
>
> Should v2-2 successfully start from sp2?
>
> Best,
>
> Dongwon
>
>
>
>
>
>
> On Mon, Feb 8, 2021 at 11:48 PM 张静  wrote:
>
>> Hi, Dongwon,
>>  Q1: By default, a savepoint restore will try to match all state
>> back to the restored job. `AllowNonRestoredState` cannot avoid
>> recovery all state from savepoint, but only skip match all of the
>> restore state back to the restored job. So `ClassNotFoundException `
>> could not be avoid.
>>  Q2: Not really. After you recover new job from the savepoint
>> (savepoint1)based on (1), you could do a new savepoint (savepoint2),
>> then remove the definition of the POJO type. then you can restore from
>> savepoint2.
>> Correct me please if I'm wrong. Thanks.
>>
>> Best,
>> Beyond1920
>>
>> Dongwon Kim  于2021年2月8日周一 下午9:43写道:
>> >
>> > Hi,
>> >
>> > I have an original job (say v1) and I want to start a new job (say v2)
>> from a savepoint of v1.
>> >
>> > An operator of v1 used to have per-key states of a POJO type, but I
>> want to remove the states together with the definition of the POJO type.
>> >
>> > When I start v2 from a savepoint of v1, I specified
>> "--allowNonRestoredState" but  I got the following exception:
>> >
>> > 2021-02-08 22:07:28,324 INFO
>> org.apache.flink.runtime.executiongraph.ExecutionGraph   [] -
>> input-to-idata (5/8) (168520e96e73dbf6c2e3c5342b324764) switched from
>> RUNNING to FAILED on container_e02_1607261469522_0242_01_08 @
>> mobdata-flink-dn29.dakao.io (dataPort=45505).
>> > java.lang.Exception: Exception while creating
>> StreamOperatorStateContext.
>> > at
>> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:254)
>> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
>> > at
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:272)
>> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
>> > at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:425)
>> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
>> > at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$2(StreamTask.java:535)
>> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
>> > at
>> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
>> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
>> > at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:525)
>> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
>> > at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:565)
>> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
>> > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)
>> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
>> > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)
>> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
>> > at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_222]
>> > Caused by: org.apache.flink.util.FlinkException: Could not restore
>> keyed state backend for
>> CoBroadcastWithKeyedOperator_692eb9021f6319ecb59ea7c8901de92a_(5/8) from
>> any of the 1 provided restore options.
>> > at
>> 

Re: Cannot connect to queryable state proxy

2021-02-08 Thread Khachatryan Roman
Hi ChangZhuo,

Queryable state is exposed on the same address as the TM RPC. You can
change this address by modifying taskmanager.host [1].
However, I'm not sure if setting it to 127.0.0.1 or localhost will not
break connectivity with the other components.

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/config.html#taskmanager-host

Regards,
Roman


On Sun, Feb 7, 2021 at 2:20 PM ChangZhuo Chen (陳昌倬) 
wrote:

> On Thu, Feb 04, 2021 at 04:26:42PM +0800, ChangZhuo Chen (陳昌倬) wrote:
> > Hi,
> >
> > We have problem connecting to queryable state client proxy as described
> > in [0]. Any help is appreciated.
> >
> > * The port 6125 is opened in taskmanager pod.
> >
> >   ```
> >   root@-654b94754d-2vknh:/tmp# ss -tlp
> >   StateRecv-Q   Send-Q Local
> Address:Port  Peer Address:Port  Process
> >   LISTEN   01024
> 0.0.0.0:46561  0.0.0.0:*
> >   LISTEN   03
> 0.0.0.0:9249   0.0.0.0:*
> >   LISTEN   01024
> 0.0.0.0:6122   0.0.0.0:*
> >   LISTEN   01024
> 10.200.11.3:9067   0.0.0.0:*
> >   LISTEN   01024
> 10.200.11.3:6125   0.0.0.0:*
> >   LISTEN   01024
> 0.0.0.0:38607  0.0.0.0:*
> >   ```
>
> The problem is that Flink only listens 10.200.11.3:6125 for queryable
> state client proxy, so we need to use correct network to connect to it.
> Is there any way we can make Flink to listen to 0.0.0.0 for queryable
> state client proxy?
>
>
> --
> ChangZhuo Chen (陳昌倬) czchen@{czchen,debconf,debian}.org
> http://czchen.info/
> Key fingerprint = BA04 346D C2E1 FE63 C790  8793 CC65 B0CD EC27 5D5B
>


Re: question on ValueState

2021-02-08 Thread Khachatryan Roman
Hi,

I think Yun Tang is right, HeapStateBackend doesn't (de)serialize the value
on update.
As for "value()", it may (de)serialize it and return a copy if there is an
ongoing async snapshot in progress (to protect from modifications). This
shouldn't happen often though.

Regards,
Roman


On Mon, Feb 8, 2021 at 3:24 AM Yun Tang  wrote:

> Hi,
>
> MemoryStateBackend and FsStateBackend both hold keyed state in
> HeapKeyedStateBackend [1], and the main structure to store data is
> StateTable [2] which holds POJO format objects. That is to say, the object
> would not be serialized when calling update().
> On the other hand, RocksDB statebackend would store value with serialized
> bytes.
>
>
> [1]
> https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
> [2]
> https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/StateTable.java
>
> Best
> Yun Tang
>
> --
> *From:* Colletta, Edward 
> *Sent:* Sunday, February 7, 2021 19:53
> *To:* user@flink.apache.org 
> *Subject:* question on ValueState
>
>
> Using FsStateBackend.
>
>
>
> I was under the impression that ValueState.value will serialize an object
> which is stored in the local state backend, copy the serialized object and
> deserializes it.  Likewise update() would do the same steps copying the
> object back to local state backend.And as a consequence, storing
> collections in ValueState is much less efficient than using ListState or
> MapState if possible.
>
>
>
> However, I am looking at some code I wrote a while ago which made the
> assumption that the value() method just returned a reference to the
> object.  The code only calls update() when creating the object if value()
> returns null.Yet the code works, all changes to the object stored in
> state are visible the next time value() is called.   I have some sample
> code below.
>
>
>
> Can someone clarify what really happens when value() is called?
>
>
>
>
>
>public void processElement(M in, Context ctx, Collector out)
> throws Exception {
>
> MyWindow myWindow;
>
> myWindow = windowState.value();
>
> if (myWindow == null) {
>
>
> ctx.timerService().registerProcessingTimeTimer(((ctx.timerService().currentProcessingTime()
> + interval) / interval) * interval);
>
> myWindow = new MyWindow(0L, slide, windowSize);
>
> windowState.update(myWindow);
>
> myWindow.eq.add(0L);
>
> }
>
>
> myWindow.eq.getTail().setAccumulator(myWindow.eq.getTail().getAccumulator()
> + in.value);
>
> }
>
>
>
> @Override
>
> public void onTimer(long timestamp, OnTimerContext ctx,
> Collector out) throws Exception {
>
>
> ctx.timerService().registerProcessingTimeTimer(((ctx.timerService().currentProcessingTime()
> + interval) / interval) * interval);
>
> MyWindow myWindow = windowState.value();
>
> myWindow.slide(0L);
>
> out.collect(myWindow.globalAccum);
>
> }
>
>
>
>
>


Re: Kafka connector doesn't support consuming update and delete changes in Table SQL API

2021-02-08 Thread Khachatryan Roman
Hi,

AFAIK this should be supported in 1.12 via FLINK-19568 [1]
I'm pulling in Timo and Jark who might know better.

https://issues.apache.org/jira/browse/FLINK-19857

Regards,
Roman


On Mon, Feb 8, 2021 at 9:14 AM meneldor  wrote:

> Any help please? Is there a way to use the "Last row" from a deduplication
> in an append-only stream or tell upsert-kafka to not produce *null*
> records in the sink?
>
> Thank you
>
> On Thu, Feb 4, 2021 at 1:22 PM meneldor  wrote:
>
>> Hello,
>> Flink 1.12.1(pyflink)
>> I am deduplicating CDC records coming from Maxwell in a kafka topic.
>> Here is the SQL:
>>
>> CREATE TABLE stats_topic(
>>>   `data` ROW<`id` BIGINT, `account` INT, `upd_ts` BIGINT>,
>>>   `ts` BIGINT,
>>>   `xid` BIGINT ,
>>>   row_ts AS TO_TIMESTAMP(FROM_UNIXTIME(data.upd_ts)),
>>>   WATERMARK FOR `row_ts` AS `row_ts`   - INTERVAL '15' SECOND
>>> ) WITH (
>>>   'connector' = 'kafka',
>>>   'format' = 'json',
>>>   'topic' = 'stats_topic',
>>>   'properties.bootstrap.servers' = 'localhost:9092',
>>>   'properties.group.id' = 'test_group'
>>> )
>>>
>>> CREATE TABLE sink_table(
>>>   `id` BIGINT,
>>>   `account` INT,
>>>   `upd_ts` BIGINT
>>> ) WITH (
>>>   'connector' = 'kafka',
>>>   'format' = 'json',
>>>   'topic' = 'sink_topic',
>>>   'properties.bootstrap.servers' = 'localhost:9092',
>>>   'properties.group.id' = 'test_group'
>>> )
>>>
>>>
>>> INSERT INTO sink_table
>>> SELECT
>>> id,
>>> account,
>>> upd_ts
>>> FROM (
>>> SELECT
>>>  id,
>>>  account,
>>>  upd_ts,
>>>  ROW_NUMBER() OVER (PARTITION BY id ORDER BY upd_ts desc) AS rownum
>>> FROM stats_topic
>>> GROUP BY id, account, upd_ts, TUMBLE(row_ts, INTERVAL '20' MINUTE)
>>> )
>>> WHERE rownum=1
>>>
>>
>>  As there are a lot of CDC records for a single ID im using ROW_NUMBER()
>> and produce them on a 20 minutes interval to the sink_topic. The problem is
>> that flink doesnt allow me to use it in combination with with the kafka
>> connector:
>>
>>> pyflink.util.exceptions.TableException: Table sink
>>> 'default_catalog.default_database.sink_table' doesn't support consuming
>>> update and delete changes which is produced by node
>>> Rank(strategy=[UndefinedStrategy], rankType=[ROW_NUMBER],
>>> rankRange=[rankStart=1, rankEnd=1], partitionBy=[$f0], orderBy=[$f2 DESC],
>>> select=[$f0, $f1, $f2])
>>>
>>
>> If I use the* upsert-kafka* connector everything is fine but then i
>> receive empty JSON records in the sink topic:
>>
>>> {"id": 11, "account": 4, "upd_ts": 1612334952}
>>> {"id": 22, "account": 4, "upd_ts": 1612334953}
>>> {}
>>> {"id": 33, "account": 4, "upd_ts": 1612334955}
>>> {}
>>> {"id": 44, "account": 4, "upd_ts": 1612334956}
>>>
>>
>> Thank you!
>>
>


Re: Jobmanager stopped because uncaught exception

2021-02-08 Thread Khachatryan Roman
Hi,

The open issue you mentioned (FLINK-21053) is about preventing potential
issues in the future.
The issue you are experiencing is most likely FLINK-20992 as Yang Wang said.
So upgrading to 1.12.2 should solve the problem.

Regards,
Roman


On Mon, Feb 8, 2021 at 9:05 AM Lei Wang  wrote:

> I see there's a related issue
> https://issues.apache.org/jira/browse/FLINK-21053 which is still open.
>
> Does it mean the similar issue will still exist  even if i upgrade to
> 1.12.2 ?
>
> Thanks,
> Lei
>
> On Mon, Feb 8, 2021 at 3:54 PM Yang Wang  wrote:
>
>> Maybe it is a known issue[1] and has already been resolved in 1.12.2(will
>> release soon).
>> BTW, I think it is unrelated with the aliyun oss info logs.
>>
>> [1]. https://issues.apache.org/jira/browse/FLINK-20992
>>
>>
>> Best,
>> Yang
>>
>> Lei Wang  于2021年2月8日周一 下午2:22写道:
>>
>>> Flink standalone HA.   Flink version 1.12.1
>>>
>>> 2021-02-08 13:57:50,550 ERROR
>>> org.apache.flink.runtime.util.FatalExitExceptionHandler  [] - FATAL:
>>> Thread 'cluster-io-thread-30' produced an uncaught exception. Stopping the
>>> process...
>>> java.util.concurrent.RejectedExecutionException: Task
>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask@3a4ab3cb
>>> rejected from 
>>> java.util.concurrent.ScheduledThreadPoolExecutor@6222948[Terminated,
>>> pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 455]
>>> at
>>> java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063)
>>> ~[?:1.8.0_275]
>>> at
>>> java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830)
>>> ~[?:1.8.0_275]
>>> at
>>> java.util.concurrent.ScheduledThreadPoolExecutor.delayedExecute(ScheduledThreadPoolExecutor.java:326)
>>> ~[?:1.8.0_275]
>>> at
>>> java.util.concurrent.ScheduledThreadPoolExecutor.schedule(ScheduledThreadPoolExecutor.java:533)
>>> ~[?:1.8.0_275]
>>> at
>>> java.util.concurrent.ScheduledThreadPoolExecutor.execute(ScheduledThreadPoolExecutor.java:622)
>>> ~[?:1.8.0_275]
>>> at
>>> java.util.concurrent.Executors$DelegatedExecutorService.execute(Executors.java:668)
>>> ~[?:1.8.0_275]
>>> at
>>> org.apache.flink.runtime.concurrent.ScheduledExecutorServiceAdapter.execute(ScheduledExecutorServiceAdapter.java:64)
>>> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
>>> at
>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.scheduleTriggerRequest(CheckpointCoordinator.java:1290)
>>> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
>>> at
>>> org.apache.flink.runtime.checkpoint.CheckpointsCleaner.lambda$cleanCheckpoint$0(CheckpointsCleaner.java:66)
>>> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
>>> at
>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>>> ~[?:1.8.0_275]
>>> at
>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>>> ~[?:1.8.0_275]
>>> at java.lang.Thread.run(Thread.java:748) [?:1.8.0_275]
>>>
>>> Using aliyun oss as statebackend storage.
>>> Before the ERROR, there's a lot of  info message like this:
>>>
>>> 2021-02-08 13:57:50,452 INFO
>>>  org.apache.flink.fs.osshadoop.shaded.com.aliyun.oss  [] -
>>> [Server]Unable to execute HT
>>> TP request: Not Found
>>> [ErrorCode]: NoSuchKey
>>> [RequestId]: 6020D2DEA1E11430349E8323
>>>
>>>
>>> Any insight on this?
>>>
>>> Thanks,
>>> Lei
>>>
>>


Re: Setting different timeouts for savepoints and checkpoints

2021-01-16 Thread Khachatryan Roman
Hi Rex,

Unfortunately not: the same timeout value is used both for savepoints and
checkpoints.

Regards,
Roman


On Sat, Jan 16, 2021 at 9:42 AM Rex Fenley  wrote:

> Hello,
>
> I'm wondering if there's a way to set different timeouts for savepoints
> and checkpoints. Our savepoints can take a number of hours to complete,
> whereas incremental checkpoints at their slowest take around 10 min. We'd
> like to timeout a checkpoint on a significantly smaller duration than a
> savepoint.
>
> Thanks!
>
> --
>
> Rex Fenley  |  Software Engineer - Mobile and Backend
>
>
> Remind.com  |  BLOG   |
>  FOLLOW US   |  LIKE US
> 
>


Re: Uncaught exception in FatalExitExceptionHandler causing JM crash while canceling job

2021-01-15 Thread Khachatryan Roman
I think you're right Till, this is the problem.
In fact, I opened a duplicating jira ticket in parallel :)
I hope we can fix it in the next version of 1.12.

Regards,
Roman


On Fri, Jan 15, 2021 at 2:09 PM Till Rohrmann  wrote:

> Thanks for reporting and analyzing this issue Kelly. I think you are
> indeed running into a Flink bug. I think the problem is the following: With
> Flink 1.12.0 [1] we introduced a throttling mechanism for discarding
> checkpoints. The way it is implemented is that once a checkpoint is
> discarded it can trigger another action. This is triggering another
> checkpoint in the CheckpointCoordinator. The problem is now that we don't
> properly handle the case when the CheckpointCoordinator has been stopped in
> the meantime (e.g. if the job has reached a terminal state). That's why we
> see this RejectedExecutionException which fails the job. This is definitely
> a bug and I have created this issue [2] for fixing it. I am also pulling in
> Roman who worked on this feature.
>
> [1] https://issues.apache.org/jira/browse/FLINK-17073
> [2] https://issues.apache.org/jira/browse/FLINK-20992
>
> Cheers,
> Till
>
> On Wed, Jan 13, 2021 at 7:30 PM Kelly Smith 
> wrote:
>
>> Hi folks,
>>
>>
>>
>> I recently upgraded to Flink 1.12.0 and I’m hitting an issue where my JM
>> is crashing while cancelling a job. This is causing Kubernetes readiness
>> probes to fail, the JM to be restarted, and then get in a bad state while
>> it tries to recover itself using ZK + a checkpoint which no longer exists.
>>
>>
>>
>> This is the only information being logged before the process exits:
>>
>>
>>
>>
>>
>>  *method*: uncaughtException
>>*msg*: FATAL: Thread 'cluster-io-thread-4' produced an uncaught
>> exception. Stopping the process...
>>*pod*: dev-dsp-flink-canary-test-9fa6d3e7-jm-59884f579-w8r6x
>>*stack*: java.util.concurrent.RejectedExecutionException: Task
>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask@41554407
>> rejected from 
>> java.util.concurrent.ScheduledThreadPoolExecutor@5d0ec6f7[Terminated,
>> pool size = 0, active threads = 0, queued tasks = 0, completed tasks =
>> 25977] at
>> java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063)
>> at
>> java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830)
>> at
>> java.util.concurrent.ScheduledThreadPoolExecutor.delayedExecute(ScheduledThreadPoolExecutor.java:326)
>> at
>> java.util.concurrent.ScheduledThreadPoolExecutor.schedule(ScheduledThreadPoolExecutor.java:533)
>> at
>> java.util.concurrent.ScheduledThreadPoolExecutor.execute(ScheduledThreadPoolExecutor.java:622)
>> at
>> java.util.concurrent.Executors$DelegatedExecutorService.execute(Executors.java:668)
>> at
>> org.apache.flink.runtime.concurrent.ScheduledExecutorServiceAdapter.execute(ScheduledExecutorServiceAdapter.java:62)
>> at
>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.scheduleTriggerRequest(CheckpointCoordinator.java:1152)
>> at
>> org.apache.flink.runtime.checkpoint.CheckpointsCleaner.lambda$cleanCheckpoint$0(CheckpointsCleaner.java:58)
>> at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>> at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>> at java.lang.Thread.run(Thread.java:748)
>>
>>
>>
>>
>>
>>
>> https://github.com/apache/flink/blob/release-1.12.0/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointsCleaner.java#L58
>>
>>
>>
>>
>>
>> I’m not sure how to debug this further, but it seems like an internal
>> Flink bug?
>>
>>
>>
>> More info:
>>
>>
>>- Checkpoints are stored in S3 and I’m using the S3 connector
>>- Identical code has been running on Flink 1.11.x for months with no
>>issues
>>
>>
>>
>>
>>
>> Thanks,
>>
>> Kelly
>>
>


Re: Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

2021-01-11 Thread Khachatryan Roman
Hi Yun,

> b)  With unaligned checkpoint enabled, the slower cases might happen if
the downstream task processes very slowly.
I think UC will be the common case with multiple sources each with DoP > 1.
IIUC, waiting for EoP will be needed on each subtask each time one of it's
source subtask finishes.

> But since only the result partition part of the finished upstream need
wait to be processed, the other part of
> the execution graph could  still perform the unaligned checkpoint normally
Yes, but checkpoint completion notification will not be sent until all the
EOPs are processed.

> Declining the RPC-trigger checkpoint would indeed simplify the
implementation, but since currently by default the
> failed checkpoint would cause job failover, thus we might have some
concerns in directly decline the checkpoint.
Not all declines cause job failure, particularly
CHECKPOINT_DECLINED_TASK_NOT_READY doesn't.

> Thus another possible option might be let the upstream task to wait till
all the pending buffers in the result partition has been flushed before get
to finish.
This is what I meant by "postpone JM notification from source". Just
blocking the task thread wouldn't add much complexity, though I'm not sure
if it would cause any problems.

> do you think it would be ok for us to view it as an optimization and
postpone it to future versions ?
I think that's a good idea.

Regards,
Roman


On Mon, Jan 11, 2021 at 11:03 AM Yun Gao  wrote:

>   Hi Roman,
>
>   Very thanks for the feedbacks !
>
>
> > Probably it would be simpler to just decline the RPC-triggered
> checkpoint
> > if not all inputs of this task are finished (with
> CHECKPOINT_DECLINED_TASK_NOT_READY).
>
> > But I wonder how significantly this waiting for EoP from every
> input will delay performing the first checkpoint
> > by B after becoming a new source. This may in turn impact
> exactly-once sinks and incremental checkpoints.
> > Maybe a better option would be to postpone JM notification from
> source until it's EoP is consumed?
>
>I also agree with that there would indeed be possible cases that
> the checkpoint get slower since it could not skip
>the data in  the result partition of the finished upstream task:
> a) For aligned checkpoint, the cases would not happen since
> the downstream tasks would always need to
> process the buffers in order.
>b)  With unaligned checkpoint enabled, the slower cases might
> happen if the downstream task processes very
> slowly.
>
>But since only the result partition part of the finished upstream
> need wait to be processed, the other part of
>the execution graph could  still perform the unaligned checkpoint
> normally, I think the average delay caused would
>be much lower than the completely aligned checkpoint, but there
> would still be extremely bad cases that
>the delay is long.
>
>Declining the RPC-trigger checkpoint would indeed simplify the
> implementation, but since currently by default the
>failed checkpoint would cause job failover, thus we might have some
> concerns in directly decline the checkpoint.
>For postpone the notification the JM notification, since current JM
> should not be able to know if the task has
>received all the EndOfPartition from the upstream tasks, we might
> need to introduce new RPC for notifying the
>state and since the triggering is not atomic, we may also met with
> some  synchronization issues between JM and TM,
>which would introduce some complexity.
>
>   Thus another possible option might be let the upstream task to wait
> till all the pending buffers in the result partition has
>   been flushed before get to finish. We could only do the wait for the
> PipelineResultPartition so it won't affect the batch
>   jobs. With the waiting the unaligned checkpoint could continue to
> trigger the upstream task and skip the buffers in
>   the result partition. Since the result partition state would be kept
> within the operator state of the last operator, after failover
>   we would found that the last operator has an non-empty state and we
> would restart the tasks containing this operator to
>   resend the snapshotted buffers. Of course this would also introduce
> some complexity, and since the probability of long delay
>   would be lower than the completely aligned case, do you think it
> would be ok for us to view it as an optimization and
>   postpone it to future versions ?
>
>  Best,
>  Yun
>
>
>
> --
> From:Khachatryan Roman 
> Send Time:2021 Jan. 11 (Mon.) 05:46
> To:Yun Gao 
> Cc:Arvid Heise ; dev ; user <
> user@flink.apache.org>
> Subject:Re: Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks
> Finished
>
> Thanks a lot for your answers Yun,
>
> > In detail, 

Re: Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

2021-01-10 Thread Khachatryan Roman
Thanks a lot for your answers Yun,

> In detail, support we have a job with the graph A -> B -> C, support in
one checkpoint A has reported FINISHED, CheckpointCoordinator would
> choose B as the new "source" to trigger checkpoint via RPC. For task B,
if it received checkpoint trigger, it would know that all its precedant
tasks
> are finished, then it would wait till all the InputChannel received
EndOfPartition from the network (namely inputChannel.onBuffer() is called
with
> EndOfPartition) and then taking snapshot for the input channels, as the
normal unaligned checkpoints does for the InputChannel side. Then
> we would be able to ensure the finished tasks always have an empty state.

Probably it would be simpler to just decline the RPC-triggered checkpoint
if not all inputs of this task are finished (with
CHECKPOINT_DECLINED_TASK_NOT_READY).

But I wonder how significantly this waiting for EoP from every input will
delay performing the first checkpoint by B after becoming a new source.
This may in turn impact exactly-once sinks and incremental checkpoints.
Maybe a better option would be to postpone JM notification from source
until it's EoP is consumed?

Regards,
Roman


On Thu, Jan 7, 2021 at 5:01 PM Yun Gao  wrote:

> Hi Roman,
>
>Very thanks for the feedbacks! I'll try to answer the issues inline:
>
> > 1. Option 1 is said to be not preferable because it wastes resources and
> adds complexity (new event).
> > However, the resources would be wasted for a relatively short time
> until the job finishes completely.
> > And compared to other options, complexity seems much lower. Or are
> differences in task completion times so huge and so common?
>
> There might be mixed jobs with both bounded sources and unbounded sources,
> in this case, the resource for the finished
> part of the job would not be able to be released.
>
> And the Option 1 also complicates the semantics of the EndOfPartition,
> since if we holding the tasks and we still need to
> notify the following tasks about all records are sent, we would have to
> introduce some kind of pre-EndOfPartition messages,
> which is similar to the current EndOfPartition, but do not cause the
> channels to be released.
>
> > 2. I think it would be helpful to describe how is rescaling handled in
> Options 2 and 3 (or maybe it's not supported for jobs about to finish).
>
> For Option 2 and 3 we managed the states via the unit of operator, thus
> the process of rescaling would be the same with the normal checkpoint.
> For example, support one operator resides in a tasks with parallelism 4,
> if 2 fo the subtasks are finished, now the state of the operator is
> composed
> of the state of the 2 remaining subtask instance, if we rescale to 5 after
> failover, the state of the 2 previous remaining subtasks would be
> re-distributed
> to the 5 new subtasks after failover.
>
> If before failover all the 4 subtasks are finished, the operator would be
> marked as finished, after failover the operator would be still marked as
> finished,
> and all the subtask instance of this operator would skip all the methods
> like open(), endOfInput(), close() and would be excluded when taking
> checkpoints
> after failover.
>
>
> > 3. Option 3 assumes that the state of a finished task is not used.
> That's true for operator state, but what about channel state (captured by
> unaligned checkpoint)?
> > I think it still has to be sent downstream which invalidates this Option.
>
> For unaligned checkpoint, if in one checkpoint a subtask is marked as
> finished, then its descandent tasks would wait all the records are received
> from the finished tasks before taking checkpoint, thus in this case we
> would not have result partition state, but only have channel state for the
> downstream tasks that are still running.
>
> In detail, support we have a job with the graph A -> B -> C, support in
> one checkpoint A has reported FINISHED, CheckpointCoordinator would
> choose B as the new "source" to trigger checkpoint via RPC. For task B, if
> it received checkpoint trigger, it would know that all its precedant tasks
> are finished, then it would wait till all the InputChannel received
> EndOfPartition from the network (namely inputChannel.onBuffer() is called
> with
> EndOfPartition) and then taking snapshot for the input channels, as the
> normal unaligned checkpoints does for the InputChannel side. Then
> we would be able to ensure the finished tasks always have an empty state.
>
> I'll also optimize the FLIP to make it more clear~
>
> Best,
>  Yun
>
>
> --Original Mail --
> *Sender:*Khachatryan Roman 
> *Send Date:*Thu Jan 7 21:55:52 2021
> *Recipients:*Arvid H

Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

2021-01-07 Thread Khachatryan Roman
Thanks for starting this discussion (and sorry for probably duplicated
questions, I couldn't find them answered in FLIP or this thread).

1. Option 1 is said to be not preferable because it wastes resources and
adds complexity (new event).
However, the resources would be wasted for a relatively short time until
the job finishes completely.
And compared to other options, complexity seems much lower. Or are
differences in task completion times so huge and so common?

2. I think it would be helpful to describe how is rescaling handled in
Options 2 and 3 (or maybe it's not supported for jobs about to finish).

3. Option 3 assumes that the state of a finished task is not used. That's
true for operator state, but what about channel state (captured by
unaligned checkpoint)? I think it still has to be sent downstream which
invalidates this Option.

Regards,
Roman


On Thu, Jan 7, 2021 at 1:21 PM Arvid Heise  wrote:

> We could introduce an interface, sth like `RequiresFinalization` or
>> `FinalizationListener` (all bad names). The operator itself knows when
>> it is ready to completely shut down, Async I/O would wait for all
>> requests, sink would potentially wait for a given number of checkpoints.
>> The interface would have a method like `isFinalized()` that the
>> framework can call after each checkpoint (and potentially at other
>> points)
>
>
> I think we are mixing two different things here that may require different
> solutions:
> 1. Tasks (=sink) that may need to do something with the final checkpoint.
> 2. Tasks that only finish after having finished operations that do not
> depend on data flow (async I/O, but I could also think of some timer
> actions in process functions).
>
> Your proposal would help most for the first case. The second case can
> solved entirely with current methods without being especially complicated:
> - EOP is only emitted once Async I/O is done with all background tasks
> - All timers are fired in a process function (I think we rather want to
> fire immediately on EOP but that's a different discussion)
> The advantage of this approach over your idea is that you don't need to
> wait for a checkpoint to complete to check for finalization.
>
> Now let's look at the first case. I see two alternatives:
> - The new sink interface implicitly incorporates this listener. Since I
> don't see a use case outside sinks, we could simply add this method to the
> new sink interface.
> - We implicitly assume that a sink is done after having a successful
> checkpoint at the end. Then we just need a tag interface
> `RequiresFinalization`. It also feels like we should add the property
> `final` to checkpoint options to help the sink detect that this is the last
> checkpoint to be taken. We could also try to always have the final
> checkpoint without tag interface on new sinks...
>
> On Thu, Jan 7, 2021 at 11:58 AM Aljoscha Krettek 
> wrote:
>
>> This is somewhat unrelated to the discussion about how to actually do
>> the triggering when sources shut down, I'll write on that separately. I
>> just wanted to get this quick thought out.
>>
>> For letting operators decide whether they actually want to wait for a
>> final checkpoint, which is relevant at least for Async I/O and
>> potentially for sinks.
>>
>> We could introduce an interface, sth like `RequiresFinalization` or
>> `FinalizationListener` (all bad names). The operator itself knows when
>> it is ready to completely shut down, Async I/O would wait for all
>> requests, sink would potentially wait for a given number of checkpoints.
>> The interface would have a method like `isFinalized()` that the
>> framework can call after each checkpoint (and potentially at other
>> points)
>>
>> This way we would decouple that logic from things that don't actually
>> need it. What do you think?
>>
>> Best,
>> Aljoscha
>>
>
>
> --
>
> Arvid Heise | Senior Java Developer
>
> 
>
> Follow us @VervericaData
>
> --
>
> Join Flink Forward  - The Apache Flink
> Conference
>
> Stream Processing | Event Driven | Real Time
>
> --
>
> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>
> --
> Ververica GmbH
> Registered at Amtsgericht Charlottenburg: HRB 158244 B
> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
> (Toni) Cheng
>


Re: Snowflake access through JDBC

2020-12-18 Thread Khachatryan Roman
Hello,

Unfortunately, this driver is not currently supported by the Table API [1].
You can implement a dialect for it [2] and construct JdbcTableSource [3]
manually.

Alternatively, you can switch to the DataStream API and use JdbcInputFormat
[4] which doesn't require dialect.

I'm also pulling in Jingson Li and Jark Wu as they might know better.

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/connectors/jdbc.html

[2]
https://ci.apache.org/projects/flink/flink-docs-stable/api/java/org/apache/flink/connector/jdbc/dialect/JdbcDialect.html
[3]
https://ci.apache.org/projects/flink/flink-docs-stable/api/java/org/apache/flink/connector/jdbc/table/JdbcTableSource.html
[4]
https://ci.apache.org/projects/flink/flink-docs-stable/api/java/org/apache/flink/connector/jdbc/JdbcInputFormat.html

Regards,
Roman


On Fri, Dec 18, 2020 at 4:55 PM Abhishek Rai  wrote:

> Hello,
>
> I'm trying to create a `StreamTableSource` for Snowflake using
> `JdbcTableSourceSinkFactory.createStreamTableSource` (in package
> org.apache.flink.connector.jdbc.table) but it fails with the following
> error message due to `JdbcDialects` not having a dialect for
> Snowflake.
>
> My goal is to fully read a Snowflake table through Flink.
>
> Is there any way to work around this?
>
> ```
> java.lang.IllegalStateException: Cannot handle such jdbc url:
> jdbc:snowflake://abc123.us-east-1.snowflakecomputing.com/?db=TEST
>   at org.apache.flink.util.Preconditions.checkState(Preconditions.java:195)
>   at
> org.apache.flink.table.descriptors.JdbcValidator.validateCommonProperties(JdbcValidator.java:79)
>   at
> org.apache.flink.table.descriptors.JdbcValidator.validate(JdbcValidator.java:64)
>   at
> org.apache.flink.connector.jdbc.table.JdbcTableSourceSinkFactory.getValidatedProperties(JdbcTableSourceSinkFactory.java:173)
>   at
> org.apache.flink.connector.jdbc.table.JdbcTableSourceSinkFactory.createStreamTableSource(JdbcTableSourceSinkFactory.java:138)
> ```
>
> Thanks,
> Abhishek
>


Re: Flink eventTIme问题

2020-12-18 Thread Khachatryan Roman
Hi Jiazhi,

Could you share table definitions and both queries?

Regards,
Roman


On Fri, Dec 18, 2020 at 4:39 AM ゞ野蠻遊戲χ  wrote:

> Hi all
>  When I use SQL with UDTF, when I call the tableEnv.sqlQuery ()
> method, I throw the following error: Rowtime attributes must not be in the
> input rows of a regular join. As a workaround you can cast the time
> attributes of input tables to TIMESTAMP before. I used the to_timestamp
> function in eventTIme and it doesn't work, How to solve the problem?
>
> sql: select
>   tmp.metric_id as metric_id,
>   tmp.metric_config as metric_config,
>   startLat,
>   destName,
>   bizType,
>   orderId,
>   completedTime,
>   orderStatus,
>   startHexList,
>   cityId,
>   type,
>   destLat,
>   endHexList,
>   destLng,
>   createTime,
>   passengerId,
>   finishedTime,
>   vehicleId,
>   startLng,
>   startName,
>   eventTime
> from
>   htw_order_dwd_htw_order_geo_Infos,
>   lateral table(
> metricUdtf('aa')
>   ) as tmp(metric_id, metric_config)
>
> Thanks
> Jiazhi
>


Re: Flink eventTIme问题

2020-12-18 Thread Khachatryan Roman
Hi Jiazhi,

Could you share table definitions and both queries?

Regards,
Roman


On Fri, Dec 18, 2020 at 4:39 AM ゞ野蠻遊戲χ  wrote:

> Hi all
>  When I use SQL with UDTF, when I call the tableEnv.sqlQuery ()
> method, I throw the following error: Rowtime attributes must not be in the
> input rows of a regular join. As a workaround you can cast the time
> attributes of input tables to TIMESTAMP before. I used the to_timestamp
> function in eventTIme and it doesn't work, How to solve the problem?
>
> sql: select
>   tmp.metric_id as metric_id,
>   tmp.metric_config as metric_config,
>   startLat,
>   destName,
>   bizType,
>   orderId,
>   completedTime,
>   orderStatus,
>   startHexList,
>   cityId,
>   type,
>   destLat,
>   endHexList,
>   destLng,
>   createTime,
>   passengerId,
>   finishedTime,
>   vehicleId,
>   startLng,
>   startName,
>   eventTime
> from
>   htw_order_dwd_htw_order_geo_Infos,
>   lateral table(
> metricUdtf('aa')
>   ) as tmp(metric_id, metric_config)
>
> Thanks
> Jiazhi
>


Re: flink sql read hive table throw java.lang.ArrayIndexOutOfBoundsException: 1024

2020-12-18 Thread Khachatryan Roman
The stacktrace looks similar to
https://issues.apache.org/jira/browse/HIVE-14483
However, it should be fixed in the version used in your setup.

Jingsong Li can you take a look at this error?

Regards,
Roman


On Thu, Dec 17, 2020 at 3:57 PM house-张浩 <312421...@qq.com> wrote:

> when i use pyflink hive sql read data insert into es ,throw the follow
> exeception : the environment : flink 1.11.2
> flink-sql-connector-hive-3.1.2_2.11-1.11.2.jar hive 3.1.2
>
> 2020-12-17 21:10:24,398 WARN  org.apache.flink.runtime.taskmanager.Task   
>  [] - Source: HiveTableSource(driver_id, driver_base_lc_p1, 
> driver_90d_lc_p1, driver_30d_lc_p1, driver_14d_lc_p1, driver_180d_lc_p1, 
> vehicle_base_lc_p1, driver_active_zone, is_incremental, dt) TablePath: 
> algorithm.jiawei_oas_driver_features_for_incremental_hive2kafka, 
> PartitionPruned: false, PartitionNums: null, ProjectedFields: [0, 8, 9] -> 
> Calc(select=[driver_id, is_incremental, dt, () AS bdi_feature_create_time]) 
> -> Sink: Sink(table=[default_catalog.default_database.0_demo4_903157246_tmp], 
> fields=[driver_id, is_incremental, dt, bdi_feature_create_time]) (1/1) 
> (98f4259c3d00fac9fc3482a4cdc8df3c) switched from RUNNING to FAILED.
> at 
> org.apache.orc.impl.ConvertTreeReaderFactory$AnyIntegerTreeReader.nextVector(ConvertTreeReaderFactory.java:445)
>  ~[flink-sql-connector-hive-3.1.2_2.11-1.11.2.jar:1.11.2]
> at 
> org.apache.orc.impl.RecordReaderImpl.nextBatch(RecordReaderImpl.java:1300) 
> ~[flink-sql-connector-hive-3.1.2_2.11-1.11.2.jar:1.11.2]
> at 
> org.apache.flink.connectors.hive.read.HiveVectorizedOrcSplitReader.reachedEnd(HiveVectorizedOrcSplitReader.java:99)
>  ~[flink-sql-connector-hive-3.1.2_2.11-1.11.2.jar:1.11.2]
> at 
> org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction.run(InputFormatSourceFunction.java:90)
>  ~[flink-dist_2.11-1.11.2.jar:1.11.2]
> at 
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
>  ~[flink-dist_2.11-1.11.2.jar:1.11.2]
> at 
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:213)
>  ~[flink-dist_2.11-1.11.2.jar:1.11.2]
> 2020-12-17 21:10:24,402 INFO  org.apache.flink.runtime.taskmanager.Task   
>  [] - Freeing task resources for Source: 
> HiveTableSource(driver_id, driver_base_lc_p1, driver_90d_lc_p1, 
> driver_30d_lc_p1, driver_14d_lc_p1, driver_180d_lc_p1, vehicle_base_lc_p1, 
> driver_active_zone, is_incremental, dt) TablePath: 
> algorithm.jiawei_oas_driver_features_for_incremental_hive2kafka, 
> PartitionPruned: false, PartitionNums: null, ProjectedFields: [0, 8, 9] -> 
> Calc(select=[driver_id, is_incremental, dt, () AS bdi_feature_create_time]) 
> -> Sink: Sink(table=[default_catalog.default_database.0_demo4_903157246_tmp], 
> fields=[driver_id, is_incremental, dt, bdi_feature_create_time]) (1/1) 
> (98f4259c3d00fac9fc3482a4cdc8df3c).
> java.lang.ArrayIndexOutOfBoundsException: 1024
> at org.apache.flink.orc.shim.OrcShimV210.nextBatch(OrcShimV210.java:35) 
> ~[flink-sql-connector-hive-3.1.2_2.11-1.11.2.jar:1.11.2]
> at org.apache.flink.orc.shim.OrcShimV210.nextBatch(OrcShimV210.java:29) 
> ~[flink-sql-connector-hive-3.1.2_2.11-1.11.2.jar:1.11.2]
> at 
> org.apache.flink.orc.OrcSplitReader.ensureBatch(OrcSplitReader.java:134) 
> ~[flink-sql-connector-hive-3.1.2_2.11-1.11.2.jar:1.11.2]
> at 
> org.apache.orc.impl.TreeReaderFactory$LongTreeReader.nextVector(TreeReaderFactory.java:612)
>  ~[flink-sql-connector-hive-3.1.2_2.11-1.11.2.jar:1.11.2]
> at 
> org.apache.orc.impl.TreeReaderFactory$TreeReader.nextVector(TreeReaderFactory.java:269)
>  ~[flink-sql-connector-hive-3.1.2_2.11-1.11.2.jar:1.11.2]
> at 
> org.apache.orc.impl.ConvertTreeReaderFactory$StringGroupFromAnyIntegerTreeReader.nextVector(ConvertTreeReaderFactory.java:1477)
>  ~[flink-sql-connector-hive-3.1.2_2.11-1.11.2.jar:1.11.2]
> at 
> org.apache.orc.impl.TreeReaderFactory$StructTreeReader.nextBatch(TreeReaderFactory.java:2012)
>  ~[flink-sql-connector-hive-3.1.2_2.11-1.11.2.jar:1.11.2]
> at 
> org.apache.flink.orc.OrcSplitReader.reachedEnd(OrcSplitReader.java:101) 
> ~[flink-sql-connector-hive-3.1.2_2.11-1.11.2.jar:1.11.2]
> at 
> org.apache.flink.connectors.hive.read.HiveTableInputFormat.reachedEnd(HiveTableInputFormat.java:261)
>  ~[flink-sql-connector-hive-3.1.2_2.11-1.11.2.jar:1.11.2]
> at 
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
>  ~[flink-dist_2.11-1.11.2.jar:1.11.2]
> 2020-12-17 21:10:24,406 INFO  
> org.apache.flink.runtime.taskexecutor.TaskExecutor
>
> so ,how to solve this question?
>
>


Re: can taskmanager listen on all interfaces, like jobmanager?

2020-12-08 Thread Khachatryan Roman
I believe it was solved in 1.11 by FLINK-15911 [1]

I tried setting taskmanager.rpc.port to 1 for 1.12 and got
tcp6   0  0 :::1:::*LISTEN
 13768/java

[1]
https://issues.apache.org/jira/browse/FLINK-15911

Regards,
Roman


On Tue, Dec 8, 2020 at 6:27 PM Barisa Obradovic  wrote:

> I've notice that jobmanager ports all listen on all interfaces by default,
> as
> well as data port on the taskmanager.
>
> The only exception is the taskmanager RPC port,
>
>
> ```
> bash-4.2$ netstat -lpn | grep 612
> tcp0  0 172.20.54.176:6121  0.0.0.0:*
>  LISTEN
> 54/java
> tcp0  0 0.0.0.0:61220.0.0.0:*
>  LISTEN
> 54/java
> ```
>
> This is flink 1.10, I don't mind upgrading if newer versions of flink have
> ability for taskmanager to listen on all ports.
>
> I've seen there are
>  taskmanager.host
>  and taskmanager.network.bind-policy
>
> But not matter what I input, I can't seem to get it listen on all
> interfaces
> :(
>
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


Re: Recommendation about RocksDB Metrics ?

2020-12-08 Thread Khachatryan Roman
Hi Kien,

I am pulling in Yun who might know better.

Regards,
Roman


On Sun, Dec 6, 2020 at 3:52 AM Truong Duc Kien 
wrote:

> Hi all,
>
> We are thinking about enabling RocksDB metrics to better monitor our
> pipeline. However, since they will have performance impact, we will have to
> be selective about which metrics we use.
>
> Does anyone have experience about which metrics are more important than
> the others ?
>
> And what metrics have the largest performance impact ?
>
> Thanks,
> Kien
>


Re: what are the valid dimensions for lookup.cache.ttl?

2020-12-08 Thread Khachatryan Roman
Hi Marco,

You can find the list of the supported time units in TimeUtils javadoc [1]:
DAYS: "d", "day"
HOURS: "h", "hour"
MINUTES: "min", "minute"
SECONDS: "s", "sec", "second"
MILLISECONDS: "ms", "milli", "millisecond"
MICROSECONDS: "µs", "micro", "microsecond"
NANOSECONDS: "ns", "nano", "nanosecond"

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/api/java/org/apache/flink/util/TimeUtils.html#parseDuration-java.lang.String-

Regards,
Roman


On Tue, Dec 8, 2020 at 4:04 PM Marco Villalobos 
wrote:

> In
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/connectors/jdbc.html
> there no allowable dimensions specified for the lookup.cache.ttl.
>
> Can somebody please provide a list of valid  values and their meaning?  I
> know 's' for seconds is supported.  How do I specify minutes?
>
>


Re: How long will keyed state exist if no TTL given?

2020-12-08 Thread Khachatryan Roman
Hi Marco,

Yes, if TTL is not configured then the state will never expire (will stay
forever until deleted explicitly).

Regards,
Roman


On Tue, Dec 8, 2020 at 5:09 PM Marco Villalobos 
wrote:

> After reading
>
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/stream/state/state.html
>
> It is unclear to me how long keyed state will exist if it has no TTL.
> Is it cached forever, unless explicitly cleared or overwritten?
>
> can somebody please explain to me?
>
> Thank you.
>


Re: Event time issues when Kinesis consumer receives batched messages

2020-12-08 Thread Khachatryan Roman
Thanks, Randal,

Yes, I think the only way is to partition the stream the same way as
kinesis does (as I wrote before).

Regards,
Roman


On Tue, Dec 8, 2020 at 1:38 PM Randal Pitt  wrote:

> Hi Roman,
>
> We're using a custom watermarker that uses a histogram to calculate a "best
> fit" event time as the data we receive can be very unordered.
>
> As you can see we're using the timestamp from the first event in the batch,
> so we're essentially sampling the timestamps rather than using them all.
>
> FlinkKinesisConsumer> consumer = new
> FlinkKinesisConsumer<>(...);
>
> consumer.setPeriodicWatermarkAssigner(
> new HistogramWatermarker<>(Time.minutes(30), 100) {
> @Override
> public long extractTimestamp(final Batch element) {
> return element.getBatch().get(0).getDate().getTime();
> }
> }
> );
>
> Cheers,
> Randal.
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


Re: Event time issues when Kinesis consumer receives batched messages

2020-12-08 Thread Khachatryan Roman
Hi Randal,

Can you share the code for the 1st approach
(FlinkKinesisConsumer.setPeriodicWatermarkAssigner))?
I think the 2nd approach (flatMap) can be improved by partitioning the
stream the same way kinesis does (i.e. same partition key).

Regards,
Roman


On Mon, Dec 7, 2020 at 2:44 PM Randal Pitt  wrote:

> Hi there,
>
> We're using Flink to read from a Kinesis stream. The stream contains
> messages that themselves contain lists of events and we want our Flink jobs
> (using the event time characteristic) to process those events individually.
> We have this working using flatMap in the DataStream but we're having
> trouble correctly assigning timestamps to the events.
>
> We have been using FlinkKinesisConsumer.setPeriodicWatermarkAssigner() as
> that should mean the watermarks are generated correctly, but it results in
> all events in one message sharing a timestamp, resulting in some events
> being assigned to the wrong window.
>
> Using DataStream.assignTimestampsAndWatermarks() after the flatMap means we
> can assign the correct timestamps, but the watermarks may not necessarily
> be
> correct with respect to the Kinesis shards.
>
> Is there are strategy we can use that gets us both watermarks from the
> Kinesis consumer and correct timestamps for individual events?
>
> Best regards,
>
> Randal.
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


Re: Flink 1.11.2 could not create kafka table source on EMR.

2020-11-17 Thread Khachatryan Roman
Hi,

Please verify that:
1. kafka-connector is indeed in the fat jar (e.g. by "jar vtf
your-program.jar | grep KafkaDynamicTableFactory")
2. kafka-connector version matches the version of Flink distribution on EMR.

Regards,
Roman


On Tue, Nov 17, 2020 at 6:47 AM Fanbin Bu  wrote:

> Hi,
>
> I could not launch my flink 1.11.2 application on EMR with exception
>
> Caused by: org.apache.flink.table.api.ValidationException:
> Could not find any factory for identifier 'kafka' that implements
> 'org.apache.flink.table.factories.DynamicTableSourceFactory' in the
> classpath.
>
> I attached the full log at the end. After checking some other threads and
> none applies in my case. here is my observation:
>
> 1. dependency check: both flink-connector-kafka and flink-json are
> included in the final fat jar.
> 2.
> resources/META-INF/services/org.apache.flink.table.factories.TableFactory
> has the following and is included in the final fat jar.
>   - org.apache.flink.formats.json.JsonRowFormatFactory
>   - org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactory
>   also noticed that only identifier datagen is shown in the log. No
> kafka or json in there.
> 3. local IntelliJ running fine.
> 4. same jar on EMR not working
>
> Please advise.
> Thanks,
> Fanbin
>
>
>
>
> Caused by: org.apache.flink.table.api.ValidationException: Unable to
> create a source for reading table
> 'default_catalog.default_database.analytics_service'.
>
> Table options are:
>
> 'connector'='kafka'
> 'format'='json'
> 'json.ignore-parse-errors'='true'
> 'properties.bootstrap.servers'='localhost:9093'
> 'properties.group.id'='xxx'
> 'properties.security.protocol'='SSL'
> 'properties.ssl.enabled.protocols'='TLSv1.2,TLSv1.1,TLSv1'
> 'properties.ssl.key.password'='secret'
> 'properties.ssl.keystore.location'='xxx.jks'
> 'properties.ssl.keystore.password'='secret'
> 'properties.ssl.keystore.type'='JKS'
> 'properties.ssl.truststore.location'='xxx.jks'
> 'properties.ssl.truststore.password'='secret'
> 'properties.ssl.truststore.type'='JKS'
> 'properties.zookeeper.connect'='localhost:2181'
> 'scan.startup.mode'='earliest-offset'
> 'topic'='events'
> at
> org.apache.flink.table.factories.FactoryUtil.createTableSource(FactoryUtil.java:125)
> at
> org.apache.flink.table.planner.plan.schema.CatalogSourceTable.buildTableScan(CatalogSourceTable.scala:140)
> at
> org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.scala:78)
> at
> org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:3492)
> at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2415)
> at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2102)
> at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2051)
> at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:661)
> at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:642)
> at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3345)
> at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2178)
> at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2051)
> at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:661)
> at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:642)
> at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3345)
> at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:568)
> at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org
> $apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:164)
> at
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:151)
> at
> org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:789)
> at
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:761)
> at
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:238)
> at
> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:78)
> at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:664)
> at com.coinbase.ml.FeatureStoreJob.runSqlQuery(FeatureStoreJob.scala:133)
> at com.coinbase.ml.FeatureStoreJob.run(FeatureStoreJob.scala:36)
> at com.coinbase.ml.RunFlinkJob$.runFlinkJob(RunFlinkJob.scala:30)
> at
> com.coinbase.ml.FlinkFeatureProcessingJobEntryPoint$.main(CmdLineParser.scala:76)
> at
> com.coinbase.ml.FlinkFeatureProcessingJobEntryPoint.main(CmdLineParser.scala)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> 

Re: why not flink delete the checkpoint directory recursively?

2020-11-17 Thread Khachatryan Roman
Hi,

I think Robert is right, state handles are deleted first, and then the
directory is deleted non-recursively.
If any exception occurs while removing the files, it will be combined with
the other exception (as suppressed).
So probably Flink failed to delete some files and then directory removal
failed because of that.
Can you share the full exception to check this?
And probably check what files exist there as Robert suggested.

Regards,
Roman


On Tue, Nov 17, 2020 at 10:38 AM Joshua Fan  wrote:

> Hi Robert,
>
> When the `delete(Path f, boolean recursive)` recursive is false, hdfs
> will throw exception like below:
> [image: checkpoint-exception.png]
>
> Yours sincerely
> Josh
>
> On Thu, Nov 12, 2020 at 4:29 PM Robert Metzger 
> wrote:
>
>> Hey Josh,
>>
>> As far as I understand the code CompletedCheckpoint.discard(), Flink is
>> removing all the files in StateUtil.bestEffortDiscardAllStateObjects, then
>> deleting the directory.
>>
>> Which files are left over in your case?
>> Do you see any exceptions on the TaskManagers?
>>
>> Best,
>> Robert
>>
>> On Wed, Nov 11, 2020 at 12:08 PM Joshua Fan 
>> wrote:
>>
>>> Hi
>>>
>>> When a checkpoint should be deleted,
>>> FsCompletedCheckpointStorageLocation.disposeStorageLocation will be
>>> called.
>>> Inside it, fs.delete(exclusiveCheckpointDir, false) will do the delete
>>> action. I wonder why the recursive parameter is set to false? as the
>>> exclusiveCheckpointDir is truly a directory. in our hadoop, this causes
>>> the checkpoint cannot be removed.
>>> It is easy to change the recursive parameter to true, but is there any
>>> potential harm?
>>>
>>> Yours sincerely
>>> Josh
>>>
>>>


Re: How to convert Int to Date

2020-11-17 Thread Khachatryan Roman
Hello,

Do both of the types you use have the same nullability?
For a primitive int, the documentation you referred to says: "Output only
if type is not nullable".

Regards,
Roman


On Tue, Nov 17, 2020 at 7:49 AM Rex Fenley  wrote:

> Hello,
>
> I'm using the Table API and I have a column which is an integer day since
> epoch. According to the docs [1] both `int` and `java.lang.Integer` are
> acceptable for DATE. However, if I try to use the SQL API to write a DATE
> out to the Elasticsearch connector for the INT column I receive an
> exception. How then should I go about converting to DATE?
>
> Exception:
> Caused by: org.apache.flink.table.api.ValidationException: Field types of
> query result and registered TableSink
> default_catalog.default_database.sink_es_people do not match.
> Query schema: [... column: INT, ...]
> Sink schema: [... column: DATE, ...]
>
> I know this column is the culprit because when I make it INT on both ends
> it works.
>
> How do I go about making my INT a DATE?
>
> Thanks!
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/types.html#date-and-time
>
> --
>
> Rex Fenley  |  Software Engineer - Mobile and Backend
>
>
> Remind.com  |  BLOG   |
>  FOLLOW US   |  LIKE US
> 
>


Re: IllegalStateException Printing Plan

2020-11-17 Thread Khachatryan Roman
Hello,

Can you share the full program?
getExecutionPlan call is probably misplaced.

Regards,
Roman


On Tue, Nov 17, 2020 at 8:26 AM Rex Fenley  wrote:

> Hello,
>
> I have the following code attempting to print the execution plan for my
> job locally. The job runs fine and Flink UI displays so I'd expect this to
> work.
>
> val tableResult = userDocsTable.executeInsert(SINK_ES_PEOPLE)
> println(s"execution plan:\n${this.env.getExecutionPlan()}")
>
> but instead I end up with
>
> Caused by: java.lang.IllegalStateException: No operators defined in
> streaming topology. Cannot execute.
>
> What am I doing wrong?
>
> --
>
> Rex Fenley  |  Software Engineer - Mobile and Backend
>
>
> Remind.com  |  BLOG   |
>  FOLLOW US   |  LIKE US
> 
>


Re: NoResourceAvailableException

2020-10-28 Thread Khachatryan Roman
Hi Alexander,

Thanks for sharing,

I see a lot of exceptions in the logs, particularly
*Caused by: java.net.BindException: Could not start actor system on any
port in port range 6123

which means that there's probably more than one instance running and is
likely the root cause.
So it makes sense to make sure that the previous attempts cleaned up.

Regards,
Roman


On Tue, Oct 20, 2020 at 12:08 AM Alexander Semeshchenko 
wrote:

> Hi Roman,
> I made the cluster: 1 master, 2 worker. All - 8 cpu, 32 g RAM . Red Hat
> Enterprise Linux Server release 7.9 (Maipo)
> vsmart-f01 - master
> vsmart-f02 - worker
> vsmart-f03 - worker
> tvsmart-f02askmanager.numberOfTaskSlots for each node is : 8
>
> Then:
> *[flink@vsmart-f01 flink-1.11.1]$ ./bin/start-cluster.sh *
> *Starting cluster.*
> *[INFO] 1 instance(s) of standalonesession are already running on
> vsmart-f01.*
> *Starting standalonesession daemon on host vsmart-f01.*
> *flink@10.92.194.19 's password: *
> *[INFO] 1 instance(s) of taskexecutor are already running on vsmart-f02.*
> *Starting taskexecutor daemon on host vsmart-f02.*
> *flink@10.92.194.20 's password: *
> *Starting taskexecutor daemon on host vsmart-f03.*
>
> The cluster start up, running WordCount from master:
> *./bin/flink run -c org.apache.flink.examples.java.wordcount.WordCount
>  ./examples/batch/WordCount.jar  --output file:/tmp/wordcount_out*
>
> After 5 min. the job was canceled.
> In the screenshot appeared that was never assigned taskmanager for the job
> operator.
> I've put the 3 logs(  from each node) here.
>
> Thanks and Best Regards.
> Alex
>
>
> On Mon, Oct 19, 2020 at 5:47 PM Khachatryan Roman <
> khachatryan.ro...@gmail.com> wrote:
>
>> Hi Alex,
>>
>> This message isn't actually a problem - netty can't find the native
>> transports and falls back to nio-based one.
>> Does increasing taskmanager.numberOfTaskSlots in flink-conf.yaml help?
>> Can you share the full logs in DEBUG mode?
>>
>> Regards,
>> Roman
>>
>>
>> On Mon, Oct 19, 2020 at 6:14 PM Alexander Semeshchenko 
>> wrote:
>>
>>> thank you for your response.
>>>
>>> taskmanager has 1 slot , 1 slot free but WordCount job never change its
>>> status from "Created".
>>> After more less 5 min. job is canceled.
>>> I attached screenshot of taskmanager.
>>>
>>> Best Regards
>>> Alexander
>>>
>>> On Wed, Oct 14, 2020 at 6:13 PM Khachatryan Roman <
>>> khachatryan.ro...@gmail.com> wrote:
>>>
>>>> Hi,
>>>> Thanks for sharing the details and sorry for the late reply.
>>>> You can check the number of free slots in the task manager in the web
>>>> UI (http://localhost:8081/#/task-manager by default).
>>>> Before running the program, there should be 1 TM with 1 slot available
>>>> which should be free (with default settings).
>>>>
>>>> If there are other jobs, you can increase slots per TM by increasing
>>>> taskmanager.numberOfTaskSlots in flink-conf.yaml [1].
>>>>
>>>> [1]
>>>> https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html#taskmanager-numberoftaskslots
>>>>
>>>> Regards,
>>>> Roman
>>>>
>>>>
>>>> On Wed, Oct 14, 2020 at 6:56 PM Alexander Semeshchenko <
>>>> as77...@gmail.com> wrote:
>>>>
>>>>> Hi, is there any news about my issue "Flink -
>>>>>  NoResourceAvailableException " post - installed WordCount job ?
>>>>> Best
>>>>>
>>>>> On Fri, Oct 9, 2020 at 10:19 AM Alexander Semeshchenko <
>>>>> as77...@gmail.com> wrote:
>>>>>
>>>>>> Yes, I made the following accions:
>>>>>> -   download Flink
>>>>>> -   ./bin/start-cluster.sh.
>>>>>> -   ./bin/flink run ./examples/streaming/WordCount.jar
>>>>>> 
>>>>>> Then, tried to increase values for > ulimit , VM memory values...
>>>>>> Below I put the logs messages.
>>>>>>
>>>>>> It's rare as I could do the  same job on: My Macbook( 8 cpu, 16g RAM
>>>>>> ), on k8s cluster - 4 cpu, 8g RAM
>>>>>>
>>>>>> Thanks
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Fri, Oct 9, 2020 at 3:32 AM Khachatryan Roman

Re: Building Flink on VirtualBox VM failing

2020-10-28 Thread Khachatryan Roman
gt;> [ERROR]   mvn  -rf :flink-tests
>>
>>
>>
>> flink-tests/target/surefire-reports/2020-10-21T11-13-24_791-jvmRun1.dump
>>
>> # Created at 2020-10-21T12:03:51.559
>> java.io.IOException: Stream closed
>> at
>> java.base/java.io.BufferedInputStream.getBufIfOpen(BufferedInputStream.java:176)
>> at
>> java.base/java.io.BufferedInputStream.read1(BufferedInputStream.java:289)
>> at
>> java.base/java.io.BufferedInputStream.read(BufferedInputStream.java:351)
>> at
>> java.base/sun.nio.cs.StreamDecoder.readBytes(StreamDecoder.java:284)
>> at
>> java.base/sun.nio.cs.StreamDecoder.implRead(StreamDecoder.java:326)
>> at java.base/sun.nio.cs.StreamDecoder.read(StreamDecoder.java:178)
>> at
>> java.base/java.io.InputStreamReader.read(InputStreamReader.java:185)
>> at java.base/java.io.Reader.read(Reader.java:189)
>> at java.base/java.util.Scanner.readInput(Scanner.java:882)
>> at
>> java.base/java.util.Scanner.findWithinHorizon(Scanner.java:1796)
>> at java.base/java.util.Scanner.hasNextLine(Scanner.java:1610)
>> at
>> org.apache.maven.surefire.booter.PpidChecker$ProcessInfoConsumer.execute(PpidChecker.java:354)
>> at
>> org.apache.maven.surefire.booter.PpidChecker.unix(PpidChecker.java:190)
>> at
>> org.apache.maven.surefire.booter.PpidChecker.isProcessAlive(PpidChecker.java:123)
>> at
>> org.apache.maven.surefire.booter.ForkedBooter$2.run(ForkedBooter.java:214)
>> at
>> java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
>> at
>> java.base/java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305)
>> at
>> java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305)
>> at
>> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
>> at
>> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
>> at java.base/java.lang.Thread.run(Thread.java:834)
>>
>>
>> # Created at 2020-10-21T12:03:51.560
>> System.exit() or native command error interrupted process checker.
>> java.lang.IllegalStateException: error [STOPPED] to read process 935338
>> at
>> org.apache.maven.surefire.booter.PpidChecker.checkProcessInfo(PpidChecker.java:145)
>> at
>> org.apache.maven.surefire.booter.PpidChecker.isProcessAlive(PpidChecker.java:124)
>> at
>> org.apache.maven.surefire.booter.ForkedBooter$2.run(ForkedBooter.java:214)
>> at
>> java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
>> at
>> java.base/java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305)
>> at
>> java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305)
>> at
>> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
>> at
>> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
>> at java.base/java.lang.Thread.run(Thread.java:834)
>>
>>
>>
>> sudo less -n /var/log/kern.log
>> ..
>> Oct 21 12:21:57 ubuntu kernel: [24024.569633]
>> oom-kill:constraint=CONSTRAINT_NONE,nodemask=(null),cpuset=/,mems_allowed=0,global_oom,task_memcg=/user.slice/user-1000.slice/user@1000.service
>> ,task=java,pid=1220764,uid=1000
>> Oct 21 12:21:57 ubuntu kernel: [24024.569804] Out of memory: Killed
>> process 1220764 (java) total-vm:8514092kB, anon-rss:4116292kB,
>> file-rss:0kB, shmem-rss:0kB, UID:1000 pgtables:9136kB oom_score_adj:0
>> Oct 21 12:21:57 ubuntu kernel: [24024.685821] oom_reaper: reaped process
>> 1220764 (java), now anon-rss:0kB, file-rss:0kB, shmem-rss:0kB
>>
>> Regards,
>> Juha
>>
>> El mié., 21 oct. 2020 a las 10:04, Juha Mynttinen (<
>> juha.myntti...@gmail.com>) escribió:
>>
>>> Hi,
>>>
>>> You're right, I thought about this also after writing the last comment -
>>> for example on Linux, the Kernel by default overcommits memory allocations
>>> and this approach doesn't work (doesn't make JVM crash right when it
>>> starts).
>>>
>>> I dug a little deeper. It seems that for ci-environments there are
>>> specific compilation scripts such as
>>> https://github.com/apache/flink/blob/master/tools/ci/compile.sh#

Re: Un-ignored Parsing Exceptions in the CsvFormat

2020-10-26 Thread Khachatryan Roman
Hey Austin,

I assigned the ticket,
that would be great if you could fix it!

Regards,
Roman


On Thu, Oct 22, 2020 at 5:08 PM Austin Cawley-Edwards <
austin.caw...@gmail.com> wrote:

> Hey Roman,
>
> Sorry to miss this -- thanks for the confirmation and making the ticket.
> I'm happy to propose a fix if someone is able to assign the ticket to me.
>
> Best,
> Austin
>
> On Mon, Oct 19, 2020 at 6:56 AM Khachatryan Roman <
> khachatryan.ro...@gmail.com> wrote:
>
>> Hey Austin,
>>
>> I think you are right. The problematic row contains an odd number of
>> delimiters in which case skipFields will return -1, which in turn leads to
>> an exception.
>>
>> I opened a bug ticket https://issues.apache.org/jira/browse/FLINK-19711
>> to fix it.
>>
>> Regards,
>> Roman
>>
>>
>> On Fri, Oct 16, 2020 at 8:32 PM Austin Cawley-Edwards <
>> austin.caw...@gmail.com> wrote:
>>
>>> Hey all,
>>>
>>> I'm ingesting CSV files with Flink 1.10.2 using SQL and the CSV
>>> Format[1].
>>>
>>> Even with the `ignoreParseErrors()` set, the job fails when it
>>> encounters some types of malformed rows. The root cause is indeed a
>>> `ParseException`, so I'm wondering if there's anything more I need to do to
>>> ignore these rows. Each field in the schema is a STRING.
>>>
>>>
>>> I've configured the CSV format and table like so:
>>>
>>> tableEnv.connect(
>>> new FileSystem()
>>> .path(path)
>>> )
>>> .withFormat(
>>> new Csv()
>>> .quoteCharacter('"')
>>> .ignoreParseErrors()
>>> )
>>> .withSchema(schema)
>>> .inAppendMode()
>>>
>>>
>>> Shot in the dark, but should `RowCsvInputFormat#parseRecord` have a
>>> check to `isLenient()` if there is an unexpected parser position?[2]
>>>
>>> Example error:
>>>
>>> 2020-10-16 12:50:18
>>> org.apache.flink.streaming.runtime.tasks.AsynchronousException: Caught
>>> exception when processing split: null
>>> at
>>> org.apache.flink.streaming.runtime.tasks.StreamTask$StreamTaskAsyncExceptionHandler.handleAsyncException(StreamTask.java:1098)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.handleAsyncException(StreamTask.java:1066)
>>> at
>>> org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator$SplitReader.run(ContinuousFileReaderOperator.java:351)
>>> Caused by: org.apache.flink.api.common.io.ParseException: Unexpected
>>> parser position for column 1 of row '",
>>> https://www.facebook.com/GoingOn-Networks-154758847925524/,https://www.linkedin.com/company/goingon,,
>>> ""company,'
>>> at
>>> org.apache.flink.api.java.io.RowCsvInputFormat.parseRecord(RowCsvInputFormat.java:204)
>>> at
>>> org.apache.flink.api.java.io.CsvInputFormat.readRecord(CsvInputFormat.java:111)
>>> at
>>> org.apache.flink.api.common.io.DelimitedInputFormat.nextRecord(DelimitedInputFormat.java:520)
>>> at
>>> org.apache.flink.api.java.io.CsvInputFormat.nextRecord(CsvInputFormat.java:79)
>>> at
>>> org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator$SplitReader.run(ContinuousFileReaderOperator.java:329)
>>>
>>>
>>> Thanks,
>>> Austin
>>>
>>> [1]:
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#csv-format
>>> [2]:
>>> https://github.com/apache/flink/blob/c09e959cf55c549ca4a3673f72deeb12a34e12f5/flink-java/src/main/java/org/apache/flink/api/java/io/RowCsvInputFormat.java#L203-L206
>>>
>>


Re: Building Flink on VirtualBox VM failing

2020-10-20 Thread Khachatryan Roman
I think you are right and I like the idea of failing the build fast.
However, when trying this approach on my local machine it didn't help: the
build didn't crash (probably, because of overcommit).
Did you try this approach in your VM?

Regards,
Roman


On Tue, Oct 20, 2020 at 12:12 PM Juha Mynttinen 
wrote:

> Hey,
>
> > Currently, tests do not run in parallel
>
> I don't think this is true, at least 100%. In 'top' it's clearly visible
> that there are multiple JVMs. If not running tests in parallel, what are
> these doing? In the main pom.xml there's configuration for the plug-in
> 'maven-surefire-plugin'.
>
> I'm not a Maven expert, but it looks to me like this: in
> https://maven.apache.org/surefire/maven-surefire-plugin/examples/fork-options-and-parallel-execution.html
> it says "The other possibility for parallel test execution is setting the
> parameter forkCount to a value higher than 1". I think that's happening
> in Flink:
>
> ${flink.forkCount}
>
> And
>
> 1C
>
> This means there's gonna be 1 * count_of_cpus forks.
>
> And this one:
>
> -Xms256m -Xmx2048m -Dmvn.forkNumber=${surefire.forkNumber}
> -XX:+UseG1GC
>
> In my case, I have 5 CPUs, so 5 forks. I think what now happens is that
> since each fork gets max 2048m heap, there's kind of mem requirement of CPU
> count * 2048 m. In my case, I have 8GB of mem, which is less than max 5 *
> 2048mb.
>
> This could be better. I think it's a completely valid computer that
> has RAM < count_of_cpus * 2048 mb, take e.g. AMD ryzen 3900X with 12 cores
> and put 16 GB of RAM there. At least memory & CPU requirements should be
> documented?
>
> If the tests really need 2GB of heap, then maybe the forkCount should be
> based on the available RAM rather than available cores, e.g. floor(RAM /
> 2GB)? I don't if that's doable in Maven
>
> I think an easy and non-intrusive improvement would be to change '
> -Xms256' to ' -Xms2048' (ms to match mx) so that the JVM would allocate
> right away 2048mb (when it starts). If there's not enough memory, the tests
> would fail immediately (JVM couldn't start). The tests would probably fail
> anyways (my case) - better fail fast..
>
> Regards,
> Juha
>
>
>
>
>
>
>
>
> El mar., 20 oct. 2020 a las 11:16, Khachatryan Roman (<
> khachatryan.ro...@gmail.com>) escribió:
>
>> Thanks for sharing this,
>> I think the activity of OOM-Killer means high memory pressure (it just
>> kills a process with the highest score of memory consumption).
>> High CPU usage can only be a consequence of it, being constant GC.
>>
>> Currently, tests do not run in parallel, but high memory usage can be
>> caused by the nature test (e.g. running Flink with high parallelism).
>> So I think the best way to deal with this is to use VM with more memory.
>>
>> Regards,
>> Roman
>>
>>
>> On Tue, Oct 20, 2020 at 8:56 AM Juha Mynttinen 
>> wrote:
>>
>>> Hey,
>>>
>>> Good hint that /var/log/kern.log. This time I can see this:
>>>
>>> Oct 20 09:44:48 ubuntu kernel: [ 1925.651551]
>>> oom-kill:constraint=CONSTRAINT_NONE,nodemask=(null),cpuset=/,mems_allowed=0,global_oom,task_memcg=/user.slice/user-1000.slice/user@1000.service
>>> ,task=java,pid=270024,uid=1000
>>> Oct 20 09:44:48 ubuntu kernel: [ 1925.651632] Out of memory: Killed
>>> process 270024 (java) total-vm:9841596kB, anon-rss:4820380kB, file-rss:0kB,
>>> shmem-rss:0kB, UID:1000 pgtables:11780kB oom_score_adj:0
>>> Oct 20 09:44:48 ubuntu kernel: [ 1925.844155] oom_reaper: reaped process
>>> 270024 (java), now anon-rss:0kB, file-rss:0kB, shmem-rss:0kB
>>>
>>> The next question is why does this happen I'll try to dig deeper.
>>>
>>> About the CPU load. I have five CPUs. Theoretically it makes sense to
>>> run five tests at time to max out the CPUs. However, when I look at what
>>> the five Java processes (that MVN forks) are doing, it can be seen that
>>> each of those processes have a large number of threads wanting to use CPU.
>>> Here's an example from 'top -H'
>>>
>>>   top - 09:42:03 up 29 min,  1 user,  load average: 17,00, 12,86, 8,81
>>> Threads: 1099 total,  21 running, 1078 sleeping,   0 stopped,   0 zombie
>>> %Cpu(s): 90,5 us,  9,4 sy,  0,0 ni,  0,0 id,  0,0 wa,  0,0 hi,  0,1 si,
>>>  0,0 st
>>> MiB Mem :   7961,6 total,   1614,3 free,   4023,8 used,   2323,5
>>> buff/cache
>>> MiB Swap:   2048,0 total,   2047,0 free,  1,0 used.   3638,9 avail
>>> Mem
>>>
>>> PID USER  PR  NIVIRTRESSHR S

Re: HA on AWS EMR

2020-10-20 Thread Khachatryan Roman
Hello Averell,

I don't think ZK data is stored on a master node. And Flink JM data is
stored usually on DFS -  according to "high-availability.storageDir" [1]

In either case, for Flink to be HA, Yarn should also be HA. And I think
this is not the case with a single master node. Please consider
multi-master EMR setup [2].

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html#high-availability-storagedir
[2] https://docs.aws.amazon.com/emr/latest/ManagementGuide/emr-plan-ha.html

Regards,
Roman


On Tue, Oct 20, 2020 at 12:13 AM Averell  wrote:

> Hello Roman,
>
> Thanks for your time.
> I'm using EMR 5.30.1 (Flink 1.10.0) with 1 master node.
> /yarn.application-attempts/ is not set (does that means unlimited?), while
> /yarn.resourcemanager.am.max-attempts/ is 4.
>
> In saying "EMR cluster crashed) I meant the cluster is lost. Some scenarios
> which could lead to this are:
>   - The master node is down
>   - The cluster is accidentally / deliberately terminated.
>
> I found a thread in our mailing list [1], in which Fabian mentioned a
> /"pointer"/ stored in Zookeeper. It looks like this piece of information is
> stored in Zookeeper's dataDir, which is by default stored in the local
> storage of the EMR's master node. I'm trying to move this one to an EFS, in
> hope that it would help. Not sure whether this is a right approach.
>
> Thanks for your help.
> Regards,
> Averell
>
>
> [1]
>
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/HA-and-zookeeper-tp27093p27119.html
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


Re: Building Flink on VirtualBox VM failing

2020-10-20 Thread Khachatryan Roman
Thanks for sharing this,
I think the activity of OOM-Killer means high memory pressure (it just
kills a process with the highest score of memory consumption).
High CPU usage can only be a consequence of it, being constant GC.

Currently, tests do not run in parallel, but high memory usage can be
caused by the nature test (e.g. running Flink with high parallelism).
So I think the best way to deal with this is to use VM with more memory.

Regards,
Roman


On Tue, Oct 20, 2020 at 8:56 AM Juha Mynttinen 
wrote:

> Hey,
>
> Good hint that /var/log/kern.log. This time I can see this:
>
> Oct 20 09:44:48 ubuntu kernel: [ 1925.651551]
> oom-kill:constraint=CONSTRAINT_NONE,nodemask=(null),cpuset=/,mems_allowed=0,global_oom,task_memcg=/user.slice/user-1000.slice/user@1000.service
> ,task=java,pid=270024,uid=1000
> Oct 20 09:44:48 ubuntu kernel: [ 1925.651632] Out of memory: Killed
> process 270024 (java) total-vm:9841596kB, anon-rss:4820380kB, file-rss:0kB,
> shmem-rss:0kB, UID:1000 pgtables:11780kB oom_score_adj:0
> Oct 20 09:44:48 ubuntu kernel: [ 1925.844155] oom_reaper: reaped process
> 270024 (java), now anon-rss:0kB, file-rss:0kB, shmem-rss:0kB
>
> The next question is why does this happen I'll try to dig deeper.
>
> About the CPU load. I have five CPUs. Theoretically it makes sense to run
> five tests at time to max out the CPUs. However, when I look at what the
> five Java processes (that MVN forks) are doing, it can be seen that each of
> those processes have a large number of threads wanting to use CPU. Here's
> an example from 'top -H'
>
>   top - 09:42:03 up 29 min,  1 user,  load average: 17,00, 12,86, 8,81
> Threads: 1099 total,  21 running, 1078 sleeping,   0 stopped,   0 zombie
> %Cpu(s): 90,5 us,  9,4 sy,  0,0 ni,  0,0 id,  0,0 wa,  0,0 hi,  0,1 si,
>  0,0 st
> MiB Mem :   7961,6 total,   1614,3 free,   4023,8 used,   2323,5 buff/cache
> MiB Swap:   2048,0 total,   2047,0 free,  1,0 used.   3638,9 avail Mem
>
> PID USER  PR  NIVIRTRESSHR S  %CPU  %MEM TIME+
> COMMAND
>
>  254825 juha  20   0 4250424 195768  27596 R  20,9   2,4   0:01.41 C2
> CompilerThre
>
>  255116 juha  20   0 2820448  99240  27488 R  20,3   1,2   0:00.78
> java
>
>  254968 juha  20   0 5312696 125212  27716 R  19,9   1,5   0:01.16
> java
>
>  255027 juha  20   0 5310648 108716  27496 R  19,9   1,3   0:00.90
> java
>
>  255123 juha  20   0 2820448  99120  27420 R  19,3   1,2   0:00.78
> java
>
>  254829 juha  20   0 4240356 184376  27792 R  17,9   2,3   0:01.26 C2
> CompilerThre
>
>  253993 juha  20   0 6436132 276808  28000 R  17,6   3,4   0:02.47 C2
> CompilerThre
>
>  254793 juha  20   0 4250424 195768  27596 R  17,3   2,4   0:01.76
> java
>
>  254801 juha  20   0 4240356 184376  27792 R  16,3   2,3   0:01.67
> java
>
>  254298 juha  20   0 6510340 435360  28212 R  15,6   5,3   0:02.82 C2
> CompilerThre
>
>  255145 juha  20   0 2820448  99240  27488 S  15,6   1,2   0:00.51 C2
> CompilerThre
>
>  255045 juha  20   0 5310648 108716  27496 R  15,3   1,3   0:00.62 C2
> CompilerThre
>
>  255151 juha  20   0 2820448  99120  27420 S  14,0   1,2   0:00.47 C2
> CompilerThre
>
>  254986 juha  20   0 5312696 125212  27716 R  12,6   1,5   0:00.76 C2
> CompilerThre
>
>  253980 juha  20   0 6436132 276808  28000 S  11,6   3,4   0:02.63
> java
>
>  255148 juha  20   0 2820448  99240  27488 S  10,6   1,2   0:00.39 C1
> CompilerThre
>
>  255154 juha  20   0 2820448  99120  27420 S   9,6   1,2   0:00.37 C1
> CompilerThre
>
>  254457 juha  20   0 4269900 218036  28236 R   9,3   2,7   0:02.22 C2
> CompilerThre
>
>  254299 juha  20   0 6510340 435360  28212 S   8,6   5,3   0:01.30 C1
> CompilerThre
>
>  255047 juha  20   0 5310648 108716  27496 S   8,6   1,3   0:00.42 C1
> CompilerThre
>
>  253994 juha  20   0 6436132 276808  28000 R   7,3   3,4   0:01.10 C1
> CompilerThre
>
>  255312 juha  20   0 4250424 195768  27596 R   7,0   2,4   0:00.21 C2
> CompilerThre
>
>  254831 juha  20   0 4240356 184376  27792 S   6,3   2,3   0:00.62 C1
> CompilerThre
>
>  254988 juha  20   0 5312696 125212  27716 S   6,3   1,5   0:00.45 C1
> CompilerThre
>
>  254828 juha  20   0 4250424 195768  27596 S   6,0   2,4   0:00.64 C1
> CompilerThre
>
>  254720 juha  20   0 6510340 435360  28212 S   5,0   5,3   0:00.15
> flink-akka.acto
>
>
> It can be seen that the JIT related threads consume quite a lot of CPU,
> essentially leaving less CPU available to the actual test code. By using
> htop I can also see the garbage collection related threads eating CPU. This
> doesn't seem right. I think it'd mak

Re: how to print for result1 in this code

2020-10-19 Thread Khachatryan Roman
To print the result you can create a POJO class matching your projected
fields and use it on line 38 instead of String.class.

For example:
Table result1 = tableA.select("content, content.hashCode() as h1,
hashCode(content) as h2");
tableEnv.toDataSet(result1, HashCodeSelect.class).print();

public static class HashCodeSelect {
  public String content;
  public int h1;
  public int h2;
  // toString
}

Regards,
Roman


On Sun, Oct 18, 2020 at 5:36 AM 大森林  wrote:

> I'm learning official document
> But the example is not complete.
>
> The code is:
> https://paste.ubuntu.com/p/Mx96MWjQ83/
>
> Could you tell me
> how to print result1
> in above code?
> (The above code is wrong)
>
>
> Thanks for your help.
>


Re: HA on AWS EMR

2020-10-19 Thread Khachatryan Roman
Hi,

Can you explain what "EMR cluster crashed" means in the 2nd scenario?
Can you also share:
- yarn.application-attempts in Flink
- yarn.resourcemanager.am.max-attempts in Yarn
- number of EMR master nodes (1 or 3)
- EMR version?

Regards,
Roman


On Mon, Oct 19, 2020 at 8:22 AM Averell  wrote:

> Hi,
>
> I'm trying to enable HA for my Flink jobs running on AWS EMR.
> Following [1], I created a common Flink YARN session and submitting all my
> jobs to that one. These 4 config params were added
> /high-availability = zookeeper
> high-availability.storageDir =
> high-availability.zookepper.path.root = /flink
> high-availability.zookeeper.quorum =  name>:2181
> /(The Zookeeper came with EMR was used)
>
> The command to start that Flink YARN session is like this:
> `/flink-yarn-session -Dtaskmanager.memory.process.size=4g -nm
> FlinkCommonSession -z FlinkCommonSession -d/`
>
> The first HA test - yarn application killed - went well. I killed that
> common session by using `/yarn application --kill /` and created a
> new session using the same command, then the jobs were restored
> automatically after that session was up.
>
> However, the 2nd HA test - EMR cluster crashed - didn't work: the */jobs
> are
> not restored/ *after the common session was created on the new EMR cluster.
> (attached  jobmanager.gz
> <
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t1586/jobmanager.gz>
>
> )
>
> Should I expect that the jobs are restored in that scenario no.2 - EMR
> cluster crashed.
> Do I miss something here?
>
> Thanks for your help.
>
> Regards,
> Averell
>
> [1]
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/deployment/yarn_setup.html
>
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


Re: NoResourceAvailableException

2020-10-19 Thread Khachatryan Roman
Hi Alex,

This message isn't actually a problem - netty can't find the native
transports and falls back to nio-based one.
Does increasing taskmanager.numberOfTaskSlots in flink-conf.yaml help?
Can you share the full logs in DEBUG mode?

Regards,
Roman


On Mon, Oct 19, 2020 at 6:14 PM Alexander Semeshchenko 
wrote:

> thank you for your response.
>
> taskmanager has 1 slot , 1 slot free but WordCount job never change its
> status from "Created".
> After more less 5 min. job is canceled.
> I attached screenshot of taskmanager.
>
> Best Regards
> Alexander
>
> On Wed, Oct 14, 2020 at 6:13 PM Khachatryan Roman <
> khachatryan.ro...@gmail.com> wrote:
>
>> Hi,
>> Thanks for sharing the details and sorry for the late reply.
>> You can check the number of free slots in the task manager in the web UI (
>> http://localhost:8081/#/task-manager by default).
>> Before running the program, there should be 1 TM with 1 slot available
>> which should be free (with default settings).
>>
>> If there are other jobs, you can increase slots per TM by increasing
>> taskmanager.numberOfTaskSlots in flink-conf.yaml [1].
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html#taskmanager-numberoftaskslots
>>
>> Regards,
>> Roman
>>
>>
>> On Wed, Oct 14, 2020 at 6:56 PM Alexander Semeshchenko 
>> wrote:
>>
>>> Hi, is there any news about my issue "Flink -
>>>  NoResourceAvailableException " post - installed WordCount job ?
>>> Best
>>>
>>> On Fri, Oct 9, 2020 at 10:19 AM Alexander Semeshchenko <
>>> as77...@gmail.com> wrote:
>>>
>>>> Yes, I made the following accions:
>>>> -   download Flink
>>>> -   ./bin/start-cluster.sh.
>>>> -   ./bin/flink run ./examples/streaming/WordCount.jar
>>>> --------
>>>> Then, tried to increase values for > ulimit , VM memory values...
>>>> Below I put the logs messages.
>>>>
>>>> It's rare as I could do the  same job on: My Macbook( 8 cpu, 16g RAM ),
>>>> on k8s cluster - 4 cpu, 8g RAM
>>>>
>>>> Thanks
>>>>
>>>>
>>>>
>>>> On Fri, Oct 9, 2020 at 3:32 AM Khachatryan Roman <
>>>> khachatryan.ro...@gmail.com> wrote:
>>>>
>>>>> I assume that before submitting a job you started a cluster with
>>>>> default settings with ./bin/start-cluster.sh.
>>>>>
>>>>> Did you submit any other jobs?
>>>>> Can you share the logs from log folder?
>>>>>
>>>>> Regards,
>>>>> Roman
>>>>>
>>>>>
>>>>> On Wed, Oct 7, 2020 at 11:03 PM Alexander Semeshchenko <
>>>>> as77...@gmail.com> wrote:
>>>>>
>>>>>>
>>>>>> <https://stackoverflow.com/posts/64252040/timeline>
>>>>>>
>>>>>> Installing (download & tar zxf) Apache Flink 1.11.1 and running: 
>>>>>> ./bin/flink
>>>>>> run examples/streaming/WordCount.jar it show on the nice message
>>>>>> after more less 5 min. the trying of submitting:  Caused by:
>>>>>> org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException:
>>>>>> Could not allocate the required slot within slot request timeout. Please
>>>>>> make sure that the cluster has enough resources. at
>>>>>> org.apache.flink.runtime.scheduler.DefaultScheduler.maybeWrapWithNoResourceAvailableException(DefaultScheduler.java:441)
>>>>>> ... 45 more Caused by: java.util.concurrent.CompletionException:
>>>>>> java.util.concurrent.TimeoutException at
>>>>>> java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)
>>>>>> at
>>>>>> java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308)
>>>>>> at
>>>>>> java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:607)
>>>>>> at
>>>>>> java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591)
>>>>>>
>>>>>> It's Flink default configuration.
>>>>>>
>>>>>> Architecture: x86_64 CPU op-mode(s): 32-bit, 64-bit Byte Order:
>>>>>> Little Endian CPU(s): 8 On-line CPU(s) list: 0-7 Thread(s) per core: 1
>>>>>> Core(s) per socket: 1
>>>>>> free -g total used free shared buff/cache available
>>>>>>
>>>>>> Mem: 62 1 23 3 37 57 Swap: 7 0 7
>>>>>>
>>>>>> are there some advices about what is happened?
>>>>>>
>>>>>


Re: ZooKeeper connection SUSPENDING

2020-10-19 Thread Khachatryan Roman
Hi,

AFAIK, the features discussed in the threads you mentioned are not yet
implemented. So there is no way to avoid Job restarts in case of ZK rolling
restarts.
I'm pulling in Till as he might know better.

Regards,
Roman


On Fri, Oct 16, 2020 at 7:45 PM Kenzyme  wrote:

> Hi,
>
> Related to
> https://mail-archives.apache.org/mod_mbox/flink-dev/201709.mbox/%3CCA+faj9yvPyzmmLoEWAMPgXDP6kx+0oed1Z5k4s3K9sgiCFyb=w...@mail.gmail.com%3E
> and https://issues.apache.org/jira/browse/FLINK-10052, I was wondering if
> there's a way to prevent Flink instances from failing while doing a rolling
> restart on ZK followers while still keeping the quorum?
>
> This is what was shown in Flink logs while restarting ZK :
> ZooKeeper connection SUSPENDING. Changes to the submitted job graphs are
> not monitored (temporarily).
>
> I was able to reproduce this twice with a quorum of 5 ZK nodes while doing
> some ZK maintenance.
>
> Thanks!
>
> Kenzyme Le
>
>
>


Re: Building Flink on VirtualBox VM failing

2020-10-19 Thread Khachatryan Roman
Hey,

One reason could be that a resource-intensive test was killed by oom
killer. You can inspect /var/log/kern.log for the related messages in your
VM.

Regards,
Roman


On Mon, Oct 19, 2020 at 5:57 PM Juha Mynttinen 
wrote:

>
> Hey,
>
> I'm trying to build Flink and failing. I'm running Ubuntu 20.04.1 in
> a virtual machine on Windows 10. I'm using OpenJDK 11.0.8. I'm on the
> master branch, commit 9eae578ae592254d54bc51c679644e8e84c65152.
>
> The command I'm using:
>
> apache-maven-3.2.5/bin/mvn clean verify
>
> The output:
>
> [INFO] Flink : Tests .. FAILURE [14:38
> min]
> [INFO] Flink : Streaming Scala  SKIPPED
> [INFO] Flink : Connectors : HCatalog .. SKIPPED
> [INFO] Flink : Connectors : Base .. SKIPPED
> [INFO] Flink : Connectors : Files . SKIPPED
> [INFO] Flink : Table :  SKIPPED
> [INFO] Flink : Table : Common . SKIPPED
> [INFO] Flink : Table : API Java ... SKIPPED
> [INFO] Flink : Table : API Java bridge  SKIPPED
> [INFO] Flink : Table : API Scala .. SKIPPED
> [INFO] Flink : Table : API Scala bridge ... SKIPPED
> [INFO] Flink : Table : SQL Parser . SKIPPED
> [INFO] Flink : Libraries :  SKIPPED
> [INFO] Flink : Libraries : CEP  SKIPPED
> [INFO] Flink : Table : Planner  SKIPPED
> [INFO] Flink : Table : SQL Parser Hive  SKIPPED
> [INFO] Flink : Table : Runtime Blink .. SKIPPED
> [INFO] Flink : Table : Planner Blink .. SKIPPED
> [INFO] Flink : Metrics : JMX .. SKIPPED
> [INFO] Flink : Formats : .. SKIPPED
> [INFO] Flink : Formats : Json . SKIPPED
> [INFO] Flink : Connectors : Kafka base  SKIPPED
> [INFO] Flink : Connectors : Elasticsearch base  SKIPPED
> [INFO] Flink : Connectors : Elasticsearch 5 ... SKIPPED
> [INFO] Flink : Connectors : Elasticsearch 6 ... SKIPPED
> [INFO] Flink : Connectors : Elasticsearch 7 ... SKIPPED
> [INFO] Flink : Connectors : HBase base  SKIPPED
> [INFO] Flink : Connectors : HBase 1.4 . SKIPPED
> [INFO] Flink : Connectors : HBase 2.2 . SKIPPED
> [INFO] Flink : Formats : Hadoop bulk .. SKIPPED
> [INFO] Flink : Formats : Orc .. SKIPPED
> [INFO] Flink : Formats : Orc nohive ... SKIPPED
> [INFO] Flink : Formats : Avro . SKIPPED
> [INFO] Flink : Formats : Parquet .. SKIPPED
> [INFO] Flink : Formats : Csv .. SKIPPED
> [INFO] Flink : Connectors : Hive .. SKIPPED
> [INFO] Flink : Connectors : JDBC .. SKIPPED
> [INFO] Flink : Connectors : RabbitMQ .. SKIPPED
> [INFO] Flink : Connectors : Twitter ... SKIPPED
> [INFO] Flink : Connectors : Nifi .. SKIPPED
> [INFO] Flink : Connectors : Cassandra . SKIPPED
> [INFO] Flink : Connectors : Filesystem  SKIPPED
> [INFO] Flink : Connectors : Kafka . SKIPPED
> [INFO] Flink : Connectors : Google PubSub . SKIPPED
> [INFO] Flink : Connectors : Kinesis ... SKIPPED
> [INFO] Flink : Connectors : SQL : Elasticsearch 6 . SKIPPED
> [INFO] Flink : Connectors : SQL : Elasticsearch 7 . SKIPPED
> [INFO] Flink : Connectors : SQL : HBase 1.4 ... SKIPPED
> [INFO] Flink : Connectors : SQL : HBase 2.2 ... SKIPPED
> [INFO] Flink : Connectors : SQL : Hive 1.2.2 .. SKIPPED
> [INFO] Flink : Connectors : SQL : Hive 2.2.0 .. SKIPPED
> [INFO] Flink : Connectors : SQL : Hive 2.3.6 .. SKIPPED
> [INFO] Flink : Connectors : SQL : Hive 3.1.2 .. SKIPPED
> [INFO] Flink : Connectors : SQL : Kafka ... SKIPPED
> [INFO] Flink : Formats : Avro confluent registry .. SKIPPED
> [INFO] Flink : Formats : Sequence file  SKIPPED
> [INFO] Flink : Formats : Compress . SKIPPED
> [INFO] Flink : Formats : SQL Orc .. SKIPPED
> [INFO] Flink : Formats : SQL Parquet .. SKIPPED
> [INFO] Flink : Formats : SQL Avro . SKIPPED
> [INFO] Flink : Examples : Streaming ... SKIPPED
> [INFO] Flink : Examples : Table ... SKIPPED
> [INFO] Flink : Examples : Build Helper : .. SKIPPED
> [INFO] Flink : Examples : Build 

Re: Un-ignored Parsing Exceptions in the CsvFormat

2020-10-19 Thread Khachatryan Roman
Hey Austin,

I think you are right. The problematic row contains an odd number of
delimiters in which case skipFields will return -1, which in turn leads to
an exception.

I opened a bug ticket https://issues.apache.org/jira/browse/FLINK-19711 to
fix it.

Regards,
Roman


On Fri, Oct 16, 2020 at 8:32 PM Austin Cawley-Edwards <
austin.caw...@gmail.com> wrote:

> Hey all,
>
> I'm ingesting CSV files with Flink 1.10.2 using SQL and the CSV Format[1].
>
> Even with the `ignoreParseErrors()` set, the job fails when it encounters
> some types of malformed rows. The root cause is indeed a `ParseException`,
> so I'm wondering if there's anything more I need to do to ignore these
> rows. Each field in the schema is a STRING.
>
>
> I've configured the CSV format and table like so:
>
> tableEnv.connect(
> new FileSystem()
> .path(path)
> )
> .withFormat(
> new Csv()
> .quoteCharacter('"')
> .ignoreParseErrors()
> )
> .withSchema(schema)
> .inAppendMode()
>
>
> Shot in the dark, but should `RowCsvInputFormat#parseRecord` have a check
> to `isLenient()` if there is an unexpected parser position?[2]
>
> Example error:
>
> 2020-10-16 12:50:18
> org.apache.flink.streaming.runtime.tasks.AsynchronousException: Caught
> exception when processing split: null
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask$StreamTaskAsyncExceptionHandler.handleAsyncException(StreamTask.java:1098)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.handleAsyncException(StreamTask.java:1066)
> at
> org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator$SplitReader.run(ContinuousFileReaderOperator.java:351)
> Caused by: org.apache.flink.api.common.io.ParseException: Unexpected
> parser position for column 1 of row '",
> https://www.facebook.com/GoingOn-Networks-154758847925524/,https://www.linkedin.com/company/goingon,,
> ""company,'
> at
> org.apache.flink.api.java.io.RowCsvInputFormat.parseRecord(RowCsvInputFormat.java:204)
> at
> org.apache.flink.api.java.io.CsvInputFormat.readRecord(CsvInputFormat.java:111)
> at
> org.apache.flink.api.common.io.DelimitedInputFormat.nextRecord(DelimitedInputFormat.java:520)
> at
> org.apache.flink.api.java.io.CsvInputFormat.nextRecord(CsvInputFormat.java:79)
> at
> org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator$SplitReader.run(ContinuousFileReaderOperator.java:329)
>
>
> Thanks,
> Austin
>
> [1]:
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#csv-format
> [2]:
> https://github.com/apache/flink/blob/c09e959cf55c549ca4a3673f72deeb12a34e12f5/flink-java/src/main/java/org/apache/flink/api/java/io/RowCsvInputFormat.java#L203-L206
>


Re: akka.framesize configuration does not runtime execution

2020-10-19 Thread Khachatryan Roman
Hi Yuval,

I'm also wondering why do you have such a big metadata file.
Probably, you could reduce it by decreasing
"state.backend.fs.memory-threshold" (if you didn't do so already) [1].

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/checkpointing.html#state-backend-fs-memory-threshold

Regards,
Roman


On Mon, Oct 19, 2020 at 7:26 AM Yun Tang  wrote:

> Hi Yuval
>
> First of all, large savepoint metadata would not must need a very large
> akka frame size. Writing meta data to external file system calls IO-write
> method [1] instead of sending RPC message.
>
> Secondly, savepoint would not store any confiuration, it would only store
> checkpointed state.
>
> BTW, why you could have so large RPC message over than 1GB?
>
> [1]
> https://github.com/apache/flink/blob/f705f0af6ba50f6e68c22484d1daeda842518d27/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java#L313
>
> Best
> Yun Tang
> --
> *From:* Yuval Itzchakov 
> *Sent:* Thursday, October 15, 2020 21:22
> *To:* user 
> *Subject:* akka.framesize configuration does not runtime execution
>
> Hi,
>
> Due to a very large savepoint metadata file (3GB +), I've set the
> akka.framesize that is being required to 5GB. I set this via flink.conf
> `akka.framesize` property.
>
> When trying to recover from the savepoint, the JM emits the following
> error message:
>
> "thread":"flink-akka.actor.default-dispatcher-30"
> "level":"ERROR"
> "loggerName":"akka.remote.EndpointWriter"
> "message":"Transient "Discarding oversized payload sent to
> Actor[akka.tcp://flink@XXX:XXX/user/taskmanager_0#369979612]: max allowed
> size 1073741824 bytes, actual size of encoded class
> org.apache.flink.runtime.rpc.messages.RemoteRpcInvocation was 1610683118
> "name":"akka.remote.OversizedPayloadException"
>
> As I recall, while taking the savepoint the maximum framesize was indeed
> defined as 1GB.
>
> Could it be that akka.framesize is being recovered from the stored
> savepoint, thus not allowing me to configure re-configure the maximum size
> of the payload?
>
> --
> Best Regards,
> Yuval Itzchakov.
>


Re: NoResourceAvailableException

2020-10-09 Thread Khachatryan Roman
I assume that before submitting a job you started a cluster with default
settings with ./bin/start-cluster.sh.

Did you submit any other jobs?
Can you share the logs from log folder?

Regards,
Roman


On Wed, Oct 7, 2020 at 11:03 PM Alexander Semeshchenko 
wrote:

>
> 
>
> Installing (download & tar zxf) Apache Flink 1.11.1 and running: ./bin/flink
> run examples/streaming/WordCount.jar it show on the nice message after
> more less 5 min. the trying of submitting:  Caused by:
> org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException:
> Could not allocate the required slot within slot request timeout. Please
> make sure that the cluster has enough resources. at
> org.apache.flink.runtime.scheduler.DefaultScheduler.maybeWrapWithNoResourceAvailableException(DefaultScheduler.java:441)
> ... 45 more Caused by: java.util.concurrent.CompletionException:
> java.util.concurrent.TimeoutException at
> java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)
> at
> java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308)
> at
> java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:607)
> at
> java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591)
>
> It's Flink default configuration.
>
> Architecture: x86_64 CPU op-mode(s): 32-bit, 64-bit Byte Order: Little
> Endian CPU(s): 8 On-line CPU(s) list: 0-7 Thread(s) per core: 1 Core(s) per
> socket: 1
> free -g total used free shared buff/cache available
>
> Mem: 62 1 23 3 37 57 Swap: 7 0 7
>
> are there some advices about what is happened?
>


Re: Network issue leading to "No pooled slot available"

2020-10-09 Thread Khachatryan Roman
Thanks for checking this workaround!

I've created a jira issue [1] to check if AWS SDK version can be upgraded
in Flink distribution.

Regards,
Roman


On Fri, Oct 9, 2020 at 12:54 AM Dan Diephouse  wrote:

> Well, I just dropped in the latest Amazon 1.11.878 SDK and now it
> appears to respect interrupts in a test case I created. (the test fails
> with the SDK that is in use by Flink)
>
> I will try it in a full fledged Flink environment and report back.
>
> On Thu, Oct 8, 2020 at 3:41 PM Dan Diephouse  wrote:
>
>> Did some digging... definitely appears that the Amazon SDK definitely is
>> not picking up the interrupt.  I will try playing with the connection
>> timeout. Hadoop defaults it to 20 ms, which may be part of the problem.
>> Anyone have any other ideas?
>>
>> In theory this should be fixed by SDK v2 which uses NIO, but I don't
>> think I'm up for all the changes that would involve in the downstream
>> components.
>>
>> On Thu, Oct 8, 2020 at 8:36 AM Dan Diephouse  wrote:
>>
>>> Using the latest - 1.11.2.
>>>
>>> I would assume the interruption is being ignored in the Hadoop / S3
>>> layer. I was looking at the defaults and (if I understood correctly) the
>>> client will retry 20 times. Which would explain why it never gets
>>> cancelled...
>>>
>>> On Thu, Oct 8, 2020 at 1:27 AM Khachatryan Roman <
>>> khachatryan.ro...@gmail.com> wrote:
>>>
>>>> Hi Dan Diephouse,
>>>>
>>>> From the logs you provided indeed it looks like 1 causes 2 => 3 => 4,
>>>> where 2 is a bug.
>>>> It's unclear though where the interruption is ignored (Flink/Hadoop
>>>> FS/S3 client).
>>>>
>>>> What version of Flink are you using?
>>>>
>>>> Regards,
>>>> Roman
>>>>
>>>>
>>>> On Wed, Oct 7, 2020 at 11:16 PM Dan Diephouse  wrote:
>>>>
>>>>> I am now using the S3 StreamingFileSink to send data to an S3 bucket.
>>>>> If/when the network connection has issues, it seems to put Flink into an
>>>>> irrecoverable state. Am I understanding this correctly? Any suggestions on
>>>>> how to troubleshoot / fix?
>>>>>
>>>>> Here is what I'm observing:
>>>>>
>>>>> *1. Network is dropped *
>>>>>
>>>>> *2. S3 connections do not exit gracefully*
>>>>>
>>>>> 2020-10-07 20:58:07.468  WARN 1 --- [aa565930b86fb).]
>>>>> o.apache.flink.runtime.taskmanager.Task  : Task 'Sink: Unnamed (1/12)' did
>>>>> not react to cancelling signal for 30 seconds, but is stuck in method:
>>>>>  java.base@14.0.2/sun.nio.ch.Net.poll(Native Method)
>>>>> java.base@14.0.2/sun.nio.ch.NioSocketImpl.park(NioSocketImpl.java:181)
>>>>> java.base@14.0.2
>>>>> /sun.nio.ch.NioSocketImpl.timedRead(NioSocketImpl.java:285)
>>>>> java.base@14.0.2
>>>>> /sun.nio.ch.NioSocketImpl.implRead(NioSocketImpl.java:309)
>>>>> java.base@14.0.2/sun.nio.ch.NioSocketImpl.read(NioSocketImpl.java:350)
>>>>> java.base@14.0.2
>>>>> /sun.nio.ch.NioSocketImpl$1.read(NioSocketImpl.java:803)
>>>>> java.base@14.0.2
>>>>> /java.net.Socket$SocketInputStream.read(Socket.java:982)
>>>>> java.base@14.0.2
>>>>> /sun.security.ssl.SSLSocketInputRecord.read(SSLSocketInputRecord.java:469)
>>>>> java.base@14.0.2
>>>>> /sun.security.ssl.SSLSocketInputRecord.readHeader(SSLSocketInputRecord.java:463)
>>>>> java.base@14.0.2
>>>>> /sun.security.ssl.SSLSocketInputRecord.decode(SSLSocketInputRecord.java:160)
>>>>> java.base@14.0.2
>>>>> /sun.security.ssl.SSLTransport.decode(SSLTransport.java:110)
>>>>> java.base@14.0.2
>>>>> /sun.security.ssl.SSLSocketImpl.decode(SSLSocketImpl.java:1475)
>>>>> java.base@14.0.2
>>>>> /sun.security.ssl.SSLSocketImpl.readHandshakeRecord(SSLSocketImpl.java:1381)
>>>>> java.base@14.0.2
>>>>> /sun.security.ssl.SSLSocketImpl.startHandshake(SSLSocketImpl.java:441)
>>>>> java.base@14.0.2
>>>>> /sun.security.ssl.SSLSocketImpl.startHandshake(SSLSocketImpl.java:412)
>>>>>
>>>>> app//org.apache.http.conn.ssl.SSLConnectionSocketFactory.createLayeredSocket(SSLConnectionSocketFactory.java:436)
>>>>>
>>>>> app//org.apache.http

Re: How can I increase the parallelism on the Table API for Streaming Aggregation?

2020-10-08 Thread Khachatryan Roman
Hi Felipe,

Your source is not parallel so it doesn't make sense to make local group
operator parallel.
If the source implemented ParallelSourceFunction, subsequent operators
would be parallelized too.

Regards,
Roman


On Thu, Oct 8, 2020 at 5:00 PM Felipe Gutierrez <
felipe.o.gutier...@gmail.com> wrote:

> Hi community,
>
> I was implementing the stream aggregation using Table API [1] and
> trying out the local aggregation plan to optimize the query. Basically
> I had to configure it like this:
>
> Configuration configuration = tableEnv.getConfig().getConfiguration();
> // set low-level key-value options
> configuration.setInteger("table.exec.resource.default-parallelism", 4);
> // local-global aggregation depends on mini-batch is enabled
> configuration.setString("table.exec.mini-batch.enabled", "true");
> configuration.setString("table.exec.mini-batch.allow-latency", "1 s");
> configuration.setString("table.exec.mini-batch.size", "1000");
> // enable two-phase, i.e. local-global aggregation
> configuration.setString("table.optimizer.agg-phase-strategy", "TWO_PHASE");
>
> and when I saw the query plan on the dashboard I realized that the
> LocalGroupAggregate is with parallelism 1 while the
> GlobalGroupAggregate is with parallelism 4. Why was the
> LocalGroupAggregate also with parallelism 4 since I set it on the
> property ("table.exec.resource.default-parallelism"? Here is my code
> [2].
>
> Thanks,
> Felipe
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/tuning/streaming_aggregation_optimization.html
> [2]
> https://github.com/felipegutierrez/explore-flink/blob/master/docker/ops-playground-image/java/explore-flink/src/main/java/org/sense/flink/examples/table/TaxiRideCountTable.java
>
> --
> -- Felipe Gutierrez
> -- skype: felipe.o.gutierrez
> -- https://felipeogutierrez.blogspot.com
>


Re: Network issue leading to "No pooled slot available"

2020-10-08 Thread Khachatryan Roman
Hi Dan Diephouse,

>From the logs you provided indeed it looks like 1 causes 2 => 3 => 4, where
2 is a bug.
It's unclear though where the interruption is ignored (Flink/Hadoop FS/S3
client).

What version of Flink are you using?

Regards,
Roman


On Wed, Oct 7, 2020 at 11:16 PM Dan Diephouse  wrote:

> I am now using the S3 StreamingFileSink to send data to an S3 bucket.
> If/when the network connection has issues, it seems to put Flink into an
> irrecoverable state. Am I understanding this correctly? Any suggestions on
> how to troubleshoot / fix?
>
> Here is what I'm observing:
>
> *1. Network is dropped *
>
> *2. S3 connections do not exit gracefully*
>
> 2020-10-07 20:58:07.468  WARN 1 --- [aa565930b86fb).]
> o.apache.flink.runtime.taskmanager.Task  : Task 'Sink: Unnamed (1/12)' did
> not react to cancelling signal for 30 seconds, but is stuck in method:
>  java.base@14.0.2/sun.nio.ch.Net.poll(Native Method)
> java.base@14.0.2/sun.nio.ch.NioSocketImpl.park(NioSocketImpl.java:181)
> java.base@14.0.2
> /sun.nio.ch.NioSocketImpl.timedRead(NioSocketImpl.java:285)
> java.base@14.0.2/sun.nio.ch.NioSocketImpl.implRead(NioSocketImpl.java:309)
> java.base@14.0.2/sun.nio.ch.NioSocketImpl.read(NioSocketImpl.java:350)
> java.base@14.0.2/sun.nio.ch.NioSocketImpl$1.read(NioSocketImpl.java:803)
> java.base@14.0.2/java.net.Socket$SocketInputStream.read(Socket.java:982)
> java.base@14.0.2
> /sun.security.ssl.SSLSocketInputRecord.read(SSLSocketInputRecord.java:469)
> java.base@14.0.2
> /sun.security.ssl.SSLSocketInputRecord.readHeader(SSLSocketInputRecord.java:463)
> java.base@14.0.2
> /sun.security.ssl.SSLSocketInputRecord.decode(SSLSocketInputRecord.java:160)
> java.base@14.0.2
> /sun.security.ssl.SSLTransport.decode(SSLTransport.java:110)
> java.base@14.0.2
> /sun.security.ssl.SSLSocketImpl.decode(SSLSocketImpl.java:1475)
> java.base@14.0.2
> /sun.security.ssl.SSLSocketImpl.readHandshakeRecord(SSLSocketImpl.java:1381)
> java.base@14.0.2
> /sun.security.ssl.SSLSocketImpl.startHandshake(SSLSocketImpl.java:441)
> java.base@14.0.2
> /sun.security.ssl.SSLSocketImpl.startHandshake(SSLSocketImpl.java:412)
>
> app//org.apache.http.conn.ssl.SSLConnectionSocketFactory.createLayeredSocket(SSLConnectionSocketFactory.java:436)
>
> app//org.apache.http.conn.ssl.SSLConnectionSocketFactory.connectSocket(SSLConnectionSocketFactory.java:384)
>
> app//com.amazonaws.http.conn.ssl.SdkTLSSocketFactory.connectSocket(SdkTLSSocketFactory.java:142)
>
> app//org.apache.http.impl.conn.DefaultHttpClientConnectionOperator.connect(DefaultHttpClientConnectionOperator.java:142)
>
> app//org.apache.http.impl.conn.PoolingHttpClientConnectionManager.connect(PoolingHttpClientConnectionManager.java:374)
> jdk.internal.reflect.GeneratedMethodAccessor92.invoke(Unknown Source)
> java.base@14.0.2
> /jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> java.base@14.0.2/java.lang.reflect.Method.invoke(Method.java:564)
>
> app//com.amazonaws.http.conn.ClientConnectionManagerFactory$Handler.invoke(ClientConnectionManagerFactory.java:76)
> app//com.amazonaws.http.conn.$Proxy89.connect(Unknown Source)
>
> app//org.apache.http.impl.execchain.MainClientExec.establishRoute(MainClientExec.java:393)
>
> app//org.apache.http.impl.execchain.MainClientExec.execute(MainClientExec.java:236)
>
> app//org.apache.http.impl.execchain.ProtocolExec.execute(ProtocolExec.java:186)
>
> app//org.apache.http.impl.client.InternalHttpClient.doExecute(InternalHttpClient.java:185)
>
> app//org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:83)
>
> app//org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:56)
>
> app//com.amazonaws.http.apache.client.impl.SdkHttpClient.execute(SdkHttpClient.java:72)
>
> app//com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1323)
>
> app//com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1139)
>
> app//com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:796)
>
> app//com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:764)
>
> app//com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:738)
>
> app//com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:698)
>
> app//com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:680)
> app//com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:544)
> app//com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:524)
>
> app//com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5054)
>
> app//com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5000)
>
> app//com.amazonaws.services.s3.AmazonS3Client.initiateMultipartUpload(AmazonS3Client.java:3574)
>
> 

Re: Reading from HDFS and publishing to Kafka

2020-09-27 Thread Khachatryan Roman
Hi,

1. Yes, StreamingExecutionEnvironment.readFile can be used for files on HDFS
2. I think this is a valid concern. Besides that, there are plans to
deprecate DataSet API [1]
4. Yes, the approach looks good

I'm pulling in Aljoscha for your 3rd question (and probably some
clarifications on others).

[1]
https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=158866741

Regards,
Roman


On Fri, Sep 25, 2020 at 12:50 PM Damien Hawes 
wrote:

> Hi folks,
>
> I've got the following use case, where I need to read data from HDFS and
> publish the data to Kafka, such that it can be reprocessed by another job.
>
> I've searched the web and read the docs. This has turned up no and
> concrete examples or information of how this is achieved, or even if it's
> possible at all.
>
> Further context:
>
> 1. Flink will be deployed to Kubernetes.
> 2. Kerberos is active on Hadoop.
> 3. The data is stored on HDFS as Avro.
> 4. I cannot install Flink on our Hadoop environment.
> 5. No stateful computations will be performed.
>
> I've noticed that the flink-avro package provides a class called
> AvroInputFormat, with a nullable path field, and I think this is my goto.
>
> Apologies for the poor formatting ahead, but the code I have in mind looks
> something like this:
>
>
>
> StreamingExecutionEnvironment env = ...;
> AvroInputFormat inf = new AvroInputFormat(null, Source.class);
> DataStreamSource stream = env.readFile(inf, "hdfs://path/to/data");
> // rest, + publishing to Kafka using the FlinkKafkaProducer
>
>
>
> My major questions and concerns are:
>
> 1. Is it possible to use read from HDFS using the
> StreamingExecutionEnvironment object? I'm planning on using the Data Stream
> API because of point (2) below.
> 2. Because Flink will be deployed on Kubernetes, I have a major concern
> that if I were to use the Data Set API, once Flink completes and exits, the
> pods will restart, causing unnecessary duplication of data. Is the pod
> restart a valid concern?
> 3. Is there anything special I need to be worried about regarding Kerberos
> in this instance? The key tab will be materialised on the pods upon start
> up.
> 4. Is this even a valid approach? The dataset I need to read and replay is
> small (12 TB).
>
> Any help, even in part will be appreciated.
>
> Kind regards,
>
> Damien
>
>
>
>


Re: Hiring Flink developers

2020-09-27 Thread Khachatryan Roman
Please use user mailing list for questions related to the use of Flink.
See [1] for the other lists.

[1] https://flink.apache.org/community.html#mailing-lists

Regards,
Roman


On Sun, Sep 27, 2020 at 8:29 AM Dan Hill  wrote:

> I'm looking to hire Flink developers (full time or contractors) to work on
> a specialized user event logging system.  Besides for the usual developer
> hiring websites, what are good hiring sources for Flink developers?
>
> Thanks!
> - Dan
>


Re: Flink being used in other open source projects?

2020-09-27 Thread Khachatryan Roman
Hi,

Apache Beam [1] and Zeppelin [2] can use Flink.
I don't think there are Flink setups used by open-source projects.

[1] https://beam.apache.org/documentation/runners/flink/
[2] https://zeppelin.apache.org/docs/0.9.0-SNAPSHOT/interpreter/flink.html

Regards,
Roman


On Fri, Sep 25, 2020 at 6:05 PM vinuthomas2008 
wrote:

> Hi All,
>
> Very new to Flink.
>
> Are there any open source projects using Flink? I would like to be
> involved in a project that uses Flink!!
>
> Thanks
> VT
>


Re: Best way to resolve bottlenecks with Flink?

2020-09-25 Thread Khachatryan Roman
The closest thing is the backpressure status which you mentioned.
>From there, you can troubleshoot specific subtasks by inspecting their
metrics.
There is no health summary in Flink at the moment.

Regards,
Roman


On Fri, Sep 25, 2020 at 5:35 AM Dan Hill  wrote:

> My job has very slow throughput.  What are the best signals that will
> indicate if there are performance issues?
>
> Is there an overall health summary that would indicate the most likely
> issues impacting performance?
>
> I found a variety of pages and metrics.  I resolved some of the
> backpressure in my job.  I looked at memory issues.  My direct "Outside
> JVM" is full (but I'm not sure if that's normal).
>


Re: FileSystemHaServices and BlobStore

2020-08-31 Thread Khachatryan Roman
+ dev

Blob store is used for jars, serialized job, and task information and logs.
You can find some information at
https://cwiki.apache.org/confluence/display/FLINK/FLIP-19%3A+Improved+BLOB+storage+architecture


I guess in your setup, Flink was able to pick up local files.
HA setup presumes that Flink can survive the loss of that JM host and its
local files.

I'm not sure about K8s native setup - probably VoidBlobStore is enough if
there is a persistent volume.
But in the general case, FileSystemBlobStore should be used to store files
on some DFS.


Regards,
Roman


On Sat, Aug 29, 2020 at 6:42 PM Alexey Trenikhun  wrote:

> Did test with streaming job and FileSystemHaService using VoidBlobStore
> (no HA Blob), looks like job was able to recover from both JM restart and
> TM restart. Any idea in what use cases HA Blob is needed?
>
> Thanks,
> Alexey
> --
> *From:* Alexey Trenikhun 
> *Sent:* Friday, August 28, 2020 11:31 AM
> *To:* Khachatryan Roman 
> *Cc:* Flink User Mail List 
> *Subject:* Re: FileSystemHaServices and BlobStore
>
> Motivation is to have k8s HA setup without extra component - Zookeeper,
> see [1]
>
> Purpose of BlobStore is vague to me, what kind of BLOBs are stored? Looks
> like  if we start job from savepoint, then persistence of BlobStore is
> not necessary, but is it needed if we recover from checkpoint?
>
> Thanks,
> Alexey
>
> [1]. https://issues.apache.org/jira/browse/FLINK-17598
>
>
> --
> *From:* Khachatryan Roman 
> *Sent:* Friday, August 28, 2020 9:24 AM
> *To:* Alexey Trenikhun 
> *Cc:* Flink User Mail List 
> *Subject:* Re: FileSystemHaServices and BlobStore
>
> Hello Alexey,
>
> I think you need FileSystemBlobStore as you are implementing HA Services,
> and BLOBs should be highly available too.
> However, I'm a bit concerned about the direction in general: it
> essentially means re-implementing ZK functionality on top of FS.
> What are the motivation and the use case?
>
> Regards,
> Roman
>
>
> On Fri, Aug 28, 2020 at 5:15 PM Alexey Trenikhun  wrote:
>
> Hello,
> I'm thinking about implementing FileSystemHaServices - single leader, but
> persistent RunningJobRegistry, CheckpointIDCounter,
> CompletedCheckpointStore and JobGraphStore. I'm not sure do you need
> FileSystemBlobStore or VoidBlobStore is enough. Can't figure out, should
> BlobStore survive JobManager crash. I see that ZookeeperHaServices use 
> FileSystemBlobStore,
> but not clear is to due to having multiple JobManagers (leader + follower)
> or necessity to preserve BLOBs on restart.
>
> Thanks,
> Alexey
>
>


Re: FileSystemHaServices and BlobStore

2020-08-28 Thread Khachatryan Roman
Hello Alexey,

I think you need FileSystemBlobStore as you are implementing HA Services,
and BLOBs should be highly available too.
However, I'm a bit concerned about the direction in general: it essentially
means re-implementing ZK functionality on top of FS.
What are the motivation and the use case?

Regards,
Roman


On Fri, Aug 28, 2020 at 5:15 PM Alexey Trenikhun  wrote:

> Hello,
> I'm thinking about implementing FileSystemHaServices - single leader, but
> persistent RunningJobRegistry, CheckpointIDCounter,
> CompletedCheckpointStore and JobGraphStore. I'm not sure do you need
> FileSystemBlobStore or VoidBlobStore is enough. Can't figure out, should
> BlobStore survive JobManager crash. I see that ZookeeperHaServices use 
> FileSystemBlobStore,
> but not clear is to due to having multiple JobManagers (leader + follower)
> or necessity to preserve BLOBs on restart.
>
> Thanks,
> Alexey
>


Re: Not able to Assign Watermark in Flink 1.11

2020-08-28 Thread Khachatryan Roman
Hi Anuj Jain,

You need to provide the type parameter when calling
WatermarkStrategy.forBoundedOutOfOrderness like this:

bookingFlowConsumer.assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofMinutes(15))

Regards,
Roman


On Fri, Aug 28, 2020 at 6:49 AM aj  wrote:

>
> I am getting this error when trying to assign watermark in Flink  1.11
>
> *"Cannot resolve method 'withTimestampAssigner(anonymous
> org.apache.flink.api.common.eventtime.SerializableTimestampAssigner)'"*
>
> FlinkKafkaConsumer bookingFlowConsumer = new 
> FlinkKafkaConsumer(topics,
> new KafkaGenericAvroDeserializationSchema(schemaRegistryUrl),
> properties);
>
> bookingFlowConsumer.assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofMinutes(15))
> .withTimestampAssigner(new SerializableTimestampAssigner() 
> {
>   @Override
>   public long extractTimestamp(GenericRecord genericRecord, long l) {
> return (long)genericRecord.get("event_ts");
>   }
> }));
>
>
> What is wrong with this.
>
> In Flink 1.9 I was using this function and it was working fine
>
> public static class SessionAssigner implements 
> AssignerWithPunctuatedWatermarks {
>
>   @Override
>   public long extractTimestamp(GenericRecord record, long 
> previousElementTimestamp) {
> long timestamp = (long) record.get("event_ts");
> // LOGGER.info("timestamp", timestamp);
> return timestamp;
>   }
>
>   @Override
>   public Watermark checkAndGetNextWatermark(GenericRecord record, long 
> extractedTimestamp) {
> // simply emit a watermark with every event
> // LOGGER.info("extractedTimestamp ", extractedTimestamp);
> return new Watermark(extractedTimestamp);
>   }
> }
>
>
> --
> Thanks & Regards,
> Anuj Jain
>
>
>
> 
>


Re: Kafka transaction error lead to data loss under end to end exact-once

2020-08-05 Thread Khachatryan Roman
Hi Lu,

AFAIK, it's not going to be fixed. As you mentioned in the first email,
Kafka should be configured so that it's transaction timeout is less than
your max checkpoint duration.

However, you should not only change transaction.timeout.ms in producer but
also transaction.max.timeout.ms on your brokers.
Please refer to
https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/connectors/kafka.html#caveats

Regards,
Roman


On Wed, Aug 5, 2020 at 12:24 AM Lu Niu  wrote:

> Hi, Khachatryan
>
> Thank you for the reply. Is that a problem that can be fixed? If so, is
> the fix on roadmap? Thanks!
>
> Best
> Lu
>
> On Tue, Aug 4, 2020 at 1:24 PM Khachatryan Roman <
> khachatryan.ro...@gmail.com> wrote:
>
>> Hi Lu,
>>
>> Yes, this error indicates data loss (unless there were no records in the
>> transactions).
>>
>> Regards,
>> Roman
>>
>>
>> On Mon, Aug 3, 2020 at 9:14 PM Lu Niu  wrote:
>>
>>> Hi,
>>>
>>> We are using end to end exact-once flink + kafka and
>>> encountered belowing exception which usually came after checkpoint failures:
>>> ```
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>> *Caused by: org.apache.kafka.common.errors.ProducerFencedException:
>>> Producer attempted an operation with an old epoch. Either there is a newer
>>> producer with the same transactionalId, or the producer's transaction has
>>> been expired by the broker.2020-07-28 16:27:51,633 INFO
>>>  org.apache.flink.runtime.executiongraph.ExecutionGraph- Job xxx
>>> (f08fc4b1edceb3705e2cb134a8ece73d) switched from state RUNNING to
>>> FAILING.java.lang.RuntimeException: Error while confirming checkpoint at
>>> org.apache.flink.runtime.taskmanager.Task$2.run(Task.java:1219) at
>>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at
>>> java.util.concurrent.FutureTask.run(FutureTask.java:266) at
>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>>> at
>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>>> at java.lang.Thread.run(Thread.java:748)Caused by:
>>> org.apache.flink.util.FlinkRuntimeException: Committing one of transactions
>>> failed, logging first encountered failure at
>>> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.notifyCheckpointComplete(TwoPhaseCommitSinkFunction.java:295)
>>> at
>>> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.notifyCheckpointComplete(AbstractUdfStreamOperator.java:130)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.notifyCheckpointComplete(StreamTask.java:842)
>>> at org.apache.flink.runtime.taskmanager.Task$2.run(Task.java:1214) ... 5
>>> more*
>>> ```
>>> We did some end to end tests and noticed whenever such a thing happens,
>>> there will be a data loss.
>>>
>>> Referring to several related questions, I understand I need to increase `
>>> transaction.timeout.ms`  because:
>>> ```
>>> *Semantic.EXACTLY_ONCE mode relies on the ability to commit transactions
>>> that were started before taking a checkpoint, after recovering from the
>>> said checkpoint. If the time between Flink application crash and completed
>>> restart is larger than Kafka’s transaction timeout there will be data loss
>>> (Kafka will automatically abort transactions that exceeded timeout time).*
>>> ```
>>>
>>> But I want to confirm with the community that:
>>> *Does an exception like this will always lead to data loss? *
>>>
>>> I asked because we get this exception sometimes even when the checkpoint
>>> succeeds.
>>>
>>> Setup:
>>> Flink 1.9.1
>>>
>>> Best
>>> Lu
>>>
>>


Re: Kafka transaction error lead to data loss under end to end exact-once

2020-08-04 Thread Khachatryan Roman
Hi Lu,

Yes, this error indicates data loss (unless there were no records in the
transactions).

Regards,
Roman


On Mon, Aug 3, 2020 at 9:14 PM Lu Niu  wrote:

> Hi,
>
> We are using end to end exact-once flink + kafka and encountered belowing
> exception which usually came after checkpoint failures:
> ```
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> *Caused by: org.apache.kafka.common.errors.ProducerFencedException:
> Producer attempted an operation with an old epoch. Either there is a newer
> producer with the same transactionalId, or the producer's transaction has
> been expired by the broker.2020-07-28 16:27:51,633 INFO
>  org.apache.flink.runtime.executiongraph.ExecutionGraph- Job xxx
> (f08fc4b1edceb3705e2cb134a8ece73d) switched from state RUNNING to
> FAILING.java.lang.RuntimeException: Error while confirming checkpoint at
> org.apache.flink.runtime.taskmanager.Task$2.run(Task.java:1219) at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at
> java.util.concurrent.FutureTask.run(FutureTask.java:266) at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)Caused by:
> org.apache.flink.util.FlinkRuntimeException: Committing one of transactions
> failed, logging first encountered failure at
> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.notifyCheckpointComplete(TwoPhaseCommitSinkFunction.java:295)
> at
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.notifyCheckpointComplete(AbstractUdfStreamOperator.java:130)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.notifyCheckpointComplete(StreamTask.java:842)
> at org.apache.flink.runtime.taskmanager.Task$2.run(Task.java:1214) ... 5
> more*
> ```
> We did some end to end tests and noticed whenever such a thing happens,
> there will be a data loss.
>
> Referring to several related questions, I understand I need to increase `
> transaction.timeout.ms`  because:
> ```
> *Semantic.EXACTLY_ONCE mode relies on the ability to commit transactions
> that were started before taking a checkpoint, after recovering from the
> said checkpoint. If the time between Flink application crash and completed
> restart is larger than Kafka’s transaction timeout there will be data loss
> (Kafka will automatically abort transactions that exceeded timeout time).*
> ```
>
> But I want to confirm with the community that:
> *Does an exception like this will always lead to data loss? *
>
> I asked because we get this exception sometimes even when the checkpoint
> succeeds.
>
> Setup:
> Flink 1.9.1
>
> Best
> Lu
>


Re: Flink Kafka consumer SimpleStringSchema [Deprecated]

2020-08-03 Thread Khachatryan Roman
Hi Vijay,

The javadoc for
org.apache.flink.streaming.util.serialization.SimpleStringSchema says
you should Use org.apache.flink.api.common.serialization.SimpleStringSchema
instead.

Regards,
Roman


On Mon, Aug 3, 2020 at 5:31 PM Vijayendra Yadav 
wrote:

> Hi Team,
>
>
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html
>
> new SimpleStringSchema()  --> Is showing Deprecated in my IntelliJ.
> Although it's working fine, Wanted to check if there is a replacement for
> it ?
>
>
>
> val properties = new Properties()
> properties.setProperty("bootstrap.servers", "localhost:9092")
> properties.setProperty("group.id", "test")
> stream = env
> .addSource(new FlinkKafkaConsumer[String]("topic", new
> SimpleStringSchema(), properties))
>
> Regards,
> Vijay
>


Re: Per-job mode job restart and HA configuration

2020-08-03 Thread Khachatryan Roman
Hi Suchithra,

Yes, you need to pass these parameters to standalone-job.sh in Kubernetes
job definition.

I'm pulling in Patrick as he might know this subject better.

Regards,
Roman


On Mon, Aug 3, 2020 at 12:24 PM V N, Suchithra (Nokia - IN/Bangalore) <
suchithra@nokia.com> wrote:

> Hello,
>
>
>
> I am using Flink version 1.10.1 in Kubernetes environment. In per-Job mode
> of flink, to achieve HA do we need zookeeper and HA parameters to restart
> the job? I am suspicious because job jar is part of the docker itself.
>
>
>
> Thanks,
>
> Suchithra
>


Re: Flink DataSet Iterate updating additional variable

2020-07-14 Thread Khachatryan Roman
Hi Antonio,

Yes, you are right. Revisiting your question, I'm wondering whether it's
possible to partition speeds and nodes in the same way (stably across
iterations)? (I'm assuming a distributed setup)
If not, each iteration would have to wait for *all* subtasks of the
previous iteration to finish, right?
Which will likely neglect the benefits of the iterative approach.

Regards,
Roman


On Tue, Jul 14, 2020 at 9:36 AM Antonio Martínez Carratalá <
amarti...@alto-analytics.com> wrote:

> Hi Roman,
>
> Thank you for your quick reply, but as far as I know broadcast variables
> cannot be written, my problem is that I need to update the value of the
> speed variable to use it in the next iteration.
>
> Iterate only has one input dataset and propagates it to the next iteration
> using closeWith(), but I need another variable (maybe dataset) to be
> propagated too, is this possible in some way?
>
> Thanks
>
> On Mon, Jul 13, 2020 at 8:47 PM Khachatryan Roman <
> khachatryan.ro...@gmail.com> wrote:
>
>> Hi Antonio,
>>
>> Please take a look at broadcast variables:
>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/batch/#broadcast-variables
>>
>> Regards,
>> Roman
>>
>>
>> On Mon, Jul 13, 2020 at 3:49 PM Antonio Martínez Carratalá <
>> amarti...@alto-analytics.com> wrote:
>>
>>> Hello
>>>
>>> I'm trying to implement the ForceAtlas2 (graph layout) algorithm in
>>> Flink using datasets, it is an iterative algorithm and I have most of it
>>> ready, but there is something I don't know how to do. Apart from the
>>> dataset with the coordinates (x,y) of each node I need an additional
>>> variable to regulate the speed, very simplified it would be something like
>>> this:
>>>
>>> DataSet nodes = env.fromCollection(nodesList);
>>> Double speed = 1.0;
>>> nodes.iterate(100) {
>>>nodes = nodes with forces to apply calculated;
>>>speed = speed regulated based on previous value and forces calculated;
>>>nodes = nodes coordinates updated with forces * speed;
>>> } closesWith(nodes)
>>>
>>> In my case, the nodes are the dataset I'm iterating over and it is
>>> working perfect if I forget about speed, but I don't know how to keep speed
>>> variable updated in every iteration to be able to use it in the next one
>>>
>>> Any suggestions? Thanks
>>>
>>>
>>>
>>>
>


Re: Flink DataSet Iterate updating additional variable

2020-07-13 Thread Khachatryan Roman
Hi Antonio,

Please take a look at broadcast variables:
https://ci.apache.org/projects/flink/flink-docs-stable/dev/batch/#broadcast-variables

Regards,
Roman


On Mon, Jul 13, 2020 at 3:49 PM Antonio Martínez Carratalá <
amarti...@alto-analytics.com> wrote:

> Hello
>
> I'm trying to implement the ForceAtlas2 (graph layout) algorithm in Flink
> using datasets, it is an iterative algorithm and I have most of it ready,
> but there is something I don't know how to do. Apart from the dataset with
> the coordinates (x,y) of each node I need an additional variable to
> regulate the speed, very simplified it would be something like this:
>
> DataSet nodes = env.fromCollection(nodesList);
> Double speed = 1.0;
> nodes.iterate(100) {
>nodes = nodes with forces to apply calculated;
>speed = speed regulated based on previous value and forces calculated;
>nodes = nodes coordinates updated with forces * speed;
> } closesWith(nodes)
>
> In my case, the nodes are the dataset I'm iterating over and it is working
> perfect if I forget about speed, but I don't know how to keep speed
> variable updated in every iteration to be able to use it in the next one
>
> Any suggestions? Thanks
>
>
>
>


Re: PyFlink Table API - "A group window expects a time attribute for grouping in a stream environment."

2020-07-13 Thread Khachatryan Roman
Hi Manas,

Do you have the same error if you replace

.group_by("five_sec_window, monitorId") \

with

.group_by("five_sec_window") \

?

Regards,
Roman


On Mon, Jul 13, 2020 at 11:16 AM Manas Kale  wrote:

> Hi,
> I have the following piece of code (for pyFlink v1.11) :
>
> t_env.from_path(INPUT_TABLE) \
> .select("monitorId, data, rowtime") \
> .window(Tumble.over("5.minutes").on("rowtime").alias("five_sec_window")) \
> .group_by("five_sec_window, monitorId") \
> .select("monitorId, data.avg, data.min, data.max, 
> five_sec_window.rowtime") \
> .execute_insert(OUTPUT_TABLE)
>
> Which is generating the exception :
>
> Traceback (most recent call last):
>
>
> * File "/home/manas/IU_workspace/Flink_POC/pyflink/main.py", line 124, in
> .select("monitorId, data.avg, data.min, data.max,
> five_sec_window.rowtime") \*  File
> "/home/manas/anaconda3/envs/flink_env/lib/python3.7/site-packages/pyflink/table/table.py",
> line 907, in select
> return Table(self._j_table.select(fields), self._t_env)
>   File
> "/home/manas/anaconda3/envs/flink_env/lib/python3.7/site-packages/py4j/java_gateway.py",
> line 1286, in __call__
> answer, self.gateway_client, self.target_id, self.name)
>   File
> "/home/manas/anaconda3/envs/flink_env/lib/python3.7/site-packages/pyflink/util/exceptions.py",
> line 147, in deco
> return f(*a, **kw)
>   File
> "/home/manas/anaconda3/envs/flink_env/lib/python3.7/site-packages/py4j/protocol.py",
> line 328, in get_return_value
> format(target_id, ".", name), value)
> py4j.protocol.Py4JJavaError: An error occurred while calling o87.select.
>
> *: org.apache.flink.table.api.ValidationException: A group window expects
> a time attribute for grouping in a stream environment.*
>
> The "rowtime" attribute in INPUT_TABLE is created as :
>
> exec_env = StreamExecutionEnvironment.get_execution_environment()
> exec_env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
> t_env = StreamTableEnvironment.create(exec_env,
>   
> environment_settings=EnvironmentSettings.new_instance().use_blink_planner().build()
>   )
>
> ...
>
>  .field("rowtime", DataTypes.TIMESTAMP(3))
> .rowtime(
> Rowtime()
> .timestamps_from_field("time_st")
> .watermarks_periodic_ascending())
>
> ).create_temporary_table(INPUT_TABLE)
>
>
> What is wrong with the code? I believe that I have already indicated which
> attribute has to be treated as the time attribute.
>
> Thank you,
> Manas
>


Re: Error --GC Cleaner Provider -- Flink 1.11.0

2020-07-13 Thread Khachatryan Roman
Hi Murali Pusala,

Thanks for reporting this issue.

Looks like JavaGcCleanerWrapper should use getDeclaredMethod instead of
getMethod to find Reference.tryHandlePending.

@Andrey, can you confirm this?

Regards,
Roman


On Mon, Jul 13, 2020 at 4:42 AM Murali Krishna Pusala <
muralipus...@icloud.com> wrote:

>
> Hi All,
>
> I have written simple java code that read data using Hive and transform
> using Table API (Blink Planner) and Flink 1.11.0 on HDP cluster. I am
> encountering "java.lang.Error: Failed to find GC Cleaner among available
> providers” error. Full error stack is at end of the email.
>
> Do anyone encounter the same issue or any solutions/suggestions ?
>
> Cluster Config:
>
> * Hadoop Version: 2.7.3
> * Java Version: 1.8.0_40
> * Flink 1.11.0 ( built from source)
> * Hive 1.2.1
>
> Thanks
> Murali Pusala
>
>
> Caused by: java.lang.Error: Failed to find GC Cleaner among available
> providers: [Legacy (before Java 9) cleaner provider, New Java 9+ cleaner
> provider]
> at
> org.apache.flink.util.JavaGcCleanerWrapper.findGcCleanerManager(JavaGcCleanerWrapper.java:149)
> ~[flink-dist_2.11-1.11.0.jar:1.11.0]
> at
> org.apache.flink.util.JavaGcCleanerWrapper.(JavaGcCleanerWrapper.java:56)
> ~[flink-dist_2.11-1.11.0.jar:1.11.0]
> at
> org.apache.flink.core.memory.MemoryUtils.createMemoryGcCleaner(MemoryUtils.java:111)
> ~[flink-dist_2.11-1.11.0.jar:1.11.0]
> at
> org.apache.flink.core.memory.MemorySegmentFactory.allocateOffHeapUnsafeMemory(MemorySegmentFactory.java:175)
> ~[flink-dist_2.11-1.11.0.jar:1.11.0]
> at
> org.apache.flink.runtime.memory.MemoryManager.lambda$allocatePages$0(MemoryManager.java:237)
> ~[flink-dist_2.11-1.11.0.jar:1.11.0]
> at
> org.apache.flink.runtime.memory.MemoryManager$$Lambda$188/978816879.apply(Unknown
> Source) ~[?:?]
> at
> java.util.concurrent.ConcurrentHashMap.compute(ConcurrentHashMap.java:1853)
> ~[?:1.8.0_40]
> at
> org.apache.flink.runtime.memory.MemoryManager.allocatePages(MemoryManager.java:233)
> ~[flink-dist_2.11-1.11.0.jar:1.11.0]
> at
> org.apache.flink.table.runtime.util.LazyMemorySegmentPool.nextSegment(LazyMemorySegmentPool.java:82)
> ~[flink-table-uber-blink_2.11-1.11.0.jar:1.11.0]
> at 
> org.apache.flink.runtime.io.disk.SimpleCollectingOutputView.(SimpleCollectingOutputView.java:49)
> ~[flink-dist_2.11-1.11.0.jar:1.11.0]
> at
> org.apache.flink.table.runtime.operators.aggregate.BytesHashMap$RecordArea.(BytesHashMap.java:522)
> ~[flink-table-uber-blink_2.11-1.11.0.jar:1.11.0]
> at
> org.apache.flink.table.runtime.operators.aggregate.BytesHashMap.(BytesHashMap.java:184)
> ~[flink-table-uber-blink_2.11-1.11.0.jar:1.11.0]
> at
> org.apache.flink.table.runtime.operators.aggregate.BytesHashMap.(BytesHashMap.java:148)
> ~[flink-table-uber-blink_2.11-1.11.0.jar:1.11.0]
> at LocalHashAggregateWithKeys$100.open(Unknown Source) ~[?:?]
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:291)
> ~[flink-dist_2.11-1.11.0.jar:1.11.0]
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:473)
> ~[flink-dist_2.11-1.11.0.jar:1.11.0]
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask$$Lambda$176/1800425807.run(Unknown
> Source) ~[?:?]
> at
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92)
> ~[flink-dist_2.11-1.11.0.jar:1.11.0]
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:469)
> ~[flink-dist_2.11-1.11.0.jar:1.11.0]
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:522)
> ~[flink-dist_2.11-1.11.0.jar:1.11.0]
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
> ~[flink-dist_2.11-1.11.0.jar:1.11.0]
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
> ~[flink-dist_2.11-1.11.0.jar:1.11.0]
> at java.lang.Thread.run(Thread.java:745) ~[?:1.8.0_40]
> Caused by: org.apache.flink.util.FlinkRuntimeException: Legacy (before
> Java 9) cleaner provider: Failed to find Reference#tryHandlePending method
> at
> org.apache.flink.util.JavaGcCleanerWrapper$ReflectionUtils.findMethod(JavaGcCleanerWrapper.java:398)
> ~[flink-dist_2.11-1.11.0.jar:1.11.0]
> at
> org.apache.flink.util.JavaGcCleanerWrapper$ReflectionUtils.access$1300(JavaGcCleanerWrapper.java:376)
> ~[flink-dist_2.11-1.11.0.jar:1.11.0]
> at
> org.apache.flink.util.JavaGcCleanerWrapper$PendingCleanersRunnerProvider.createPendingCleanersRunner(JavaGcCleanerWrapper.java:326)
> ~[flink-dist_2.11-1.11.0.jar:1.11.0]
> at
> org.apache.flink.util.JavaGcCleanerWrapper$PendingCleanersRunnerProvider.access$800(JavaGcCleanerWrapper.java:303)
> ~[flink-dist_2.11-1.11.0.jar:1.11.0]
>

Re: Checkpoint failed because of TimeWindow cannot be cast to VoidNamespace

2020-07-03 Thread Khachatryan Roman
I still wasn't able to reproduce the issue.

Can you also clarify:
- Are you starting the job from a savepoint or externalized checkpoint?
- If yes, was the job graph changed?
- What StreamTimeCharacteristic is set, if any?
- What exact version of Flink do you use?

Regards,
Roman


On Fri, Jul 3, 2020 at 6:38 AM Si-li Liu  wrote:

> Hi, Thanks for your help.
>
> The checkpoint configuration is
>
> checkpoint.intervalMS=30
> checkpoint.timeoutMS=30
>
> The error callstack is from JM's log, which happened in every cp.
> Currently I don't have a success cp yet.
>
> Khachatryan Roman  于2020年7月3日周五 上午3:50写道:
>
>> Hi,
>>
>> Thanks for the details.
>> However, I was not able to reproduce the issue. I used parallelism levels
>> 4, file system backend and tried different timings for
>> checkpointing, windowing and source.
>> Do you encounter this problem deterministically, is it always 1st
>> checkpoint?
>> What checkpointing interval do you use?
>>
>> Regards,
>> Roman
>>
>>
>> On Thu, Jul 2, 2020 at 1:57 PM Si-li Liu  wrote:
>>
>>> Hi, this is our production code so I have to modify it a little bit,
>>> such as variable name and function name. I think 3 classes I provide here
>>> is enough.
>>>
>>> I try to join two streams, but I don't want to use the default join
>>> function, because I want to send the joined log immediately and remove it
>>> from window state immediately. And my window gap time is very long( 20
>>> minutes), so it maybe evaluate it multiple times.
>>>
>>> class JoinFunction extends
>>>   ProcessWindowFunction[RawLog, OutputLog, String, TimeWindow]{
>>>
>>>   var ueState: ValueState[RawLog] = _
>>>   @transient var gZipThriftSerializer: GZipThriftSerializer[MyType] = _
>>>   val invalidCounter = new LongCounter()
>>>   val processCounter = new LongCounter()
>>>   val sendToKafkaCounter = new LongCounter()
>>>
>>>   override def open(parameters: Configuration): Unit = {
>>> ueState = getRuntimeContext.getState(
>>>   new ValueStateDescriptor[RawLog](bidState, classOf[RawLog])
>>> )
>>> gZipThriftSerializer = new GZipThriftSerializer[MyType]()
>>> getRuntimeContext.addAccumulator("processCounter", this.processCounter)
>>> getRuntimeContext.addAccumulator("invalidCounter", this.invalidCounter)
>>> getRuntimeContext.addAccumulator("sendToKafkaCounter", 
>>> this.sendToKafkaCounter)
>>>   }
>>>
>>>   override def process(key: String,
>>>ctx: Context,
>>>logs: Iterable[RawLog],
>>>out: Collector[OutputLog]): Unit = {
>>> if (ueState.value() != null) {
>>>   processCounter.add(1L)
>>>   val bid = ueState.value()
>>>   val bidLog = 
>>> gZipThriftSerializer.decompressAndDeserialize(bid.payload, classOf[MyType])
>>>   logs.foreach( log => {
>>> if (log.eventType == SHOW) {
>>>   val showLog = 
>>> gZipThriftSerializer.decompressAndDeserialize(log.payload, classOf[MyType])
>>>   sendToKafkaCounter.add(1L)
>>>   out.collect(new OutputLog(ThriftUtils.serialize(showLog), 
>>> Utils.getOutputTopic(showLog)))
>>> }
>>>   })
>>> } else {
>>>   invalidCounter.add(1L)
>>> }
>>>   }
>>> }
>>>
>>> class JoinTrigger extends Trigger[RawLog, TimeWindow] {
>>>
>>>   override def onElement(log: RawLog,
>>>  timestamp: Long,
>>>  window: TimeWindow,
>>>  ctx: Trigger.TriggerContext): TriggerResult = {
>>> val ueState: ValueState[RawLog] = ctx.getPartitionedState(
>>>   new ValueStateDescriptor[RawLog](bidState, classOf[RawLog])
>>> )
>>> val firstSeen: ValueState[Boolean] = ctx.getPartitionedState(
>>>   new ValueStateDescriptor[Boolean](initState, classOf[Boolean]))
>>>
>>> if (!firstSeen.value()) {
>>>   ctx.registerEventTimeTimer(window.getEnd)
>>>   firstSeen.update(true)
>>> }
>>> val eventType = log.eventType
>>> if (eventType == BID) {
>>>   ueState.update(log)
>>>   TriggerResult.CONTINUE
>>> } else {
>>>   if (ueState.value() == null) {
>>> TriggerResul

Re: Parquet data stream group converter error

2020-07-03 Thread Khachatryan Roman
Hi,

> MessageType schema = reader.getFooter().getFileMetaData().getSchema();
The first thing I'd suggest is to verify that the file contains a valid
schema and can be read by some other program, e.g. parquet-tools schema or
cat [1].

Regards,
Roman


On Thu, Jul 2, 2020 at 11:36 PM Jesse Lord  wrote:

> I am trying to read a parquet file into a datastream and then register
> that stream as a temporary table. This file is created by spark 2.4 in HDFS
> on AWS EMR. I am using flink version 1.10.0 with EMR 5.30.
>
>
>
> I am getting the following error:
>
>
>
> Caused by: org.apache.flink.streaming.runtime.tasks.AsynchronousException:
> Caught exception when processing split: null
>
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask$StreamTaskAsyncExceptionHandler.handleAsyncException(StreamTask.java:1090)
>
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.handleAsyncException(StreamTask.java:1058)
>
> at
> org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator$SplitReader.run(ContinuousFileReaderOperator.java:351)
>
> Caused by: java.lang.ClassCastException: Expected instance of group
> converter but got
> "org.apache.flink.formats.parquet.utils.RowConverter$RowPrimitiveConverter"
>
> at
> org.apache.parquet.io.api.Converter.asGroupConverter(Converter.java:34)
>
> at
> org.apache.parquet.io.RecordReaderImplementation.(RecordReaderImplementation.java:267)
>
> at
> org.apache.parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:147)
>
> at
> org.apache.parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:109)
>
> at
> org.apache.parquet.filter2.compat.FilterCompat$NoOpFilter.accept(FilterCompat.java:165)
>
> at
> org.apache.parquet.io.MessageColumnIO.getRecordReader(MessageColumnIO.java:109)
>
> at
> org.apache.flink.formats.parquet.utils.ParquetRecordReader.createRecordReader(ParquetRecordReader.java:118)
>
> at
> org.apache.flink.formats.parquet.utils.ParquetRecordReader.readNextRecord(ParquetRecordReader.java:227)
>
> at
> org.apache.flink.formats.parquet.utils.ParquetRecordReader.reachEnd(ParquetRecordReader.java:207)
>
> at
> org.apache.flink.formats.parquet.ParquetInputFormat.reachedEnd(ParquetInputFormat.java:233)
>
> at
> org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator$SplitReader.run(ContinuousFileReaderOperator.java:327)
>
>
>
> Below is a snippet of code that shows how I am trying to read the parquet
> file:
>
>
>
> String filePath = "hdfs:///path/to/single/file.parquet";
>
>
>
> ParquetFileReader reader = ParquetFileReader.open(HadoopInputFile.
> fromPath(*new* org.apache.hadoop.fs.Path(filePath), *new* Configuration
> ()));
>
> MessageType schema = reader.getFooter().getFileMetaData().getSchema();
>
>
>
> String parquetPath = "hdfs:///path/to/parquet/directory”;
>
>
>
> DataStream parquetStream = env.readFile(*new*
> ParquetRowInputFormat(*new* org.apache.flink.core.fs.Path(parquetPath),
> schema), parquetPath);
>
>
>
> Table parquetTable = tEnv.fromDataStream(parquetStream);
>
> tEnv.createTemporaryView("isession", parquetTable);
>
>
>
> Thanks,
>
> Jesse
>


Re: Checkpoint failed because of TimeWindow cannot be cast to VoidNamespace

2020-07-02 Thread Khachatryan Roman
window: TimeWindow,
>evictorContext: Evictor.EvictorContext): Unit = {}
>
>   override def evictAfter(elements: JIterable[TimestampedValue[RawLog]],
>size: Int,
>window: TimeWindow,
>evictorContext: Evictor.EvictorContext): Unit = {
> val iter = elements.iterator()
> while (iter.hasNext) {
>   iter.next()
>   iter.remove()
> }
>   }
> }
>
>
> Khachatryan Roman  于2020年7月2日周四 下午7:18写道:
>
>> Thanks for the clarification.
>>
>> Can you also share the code of other parts, particularly MyFunction?
>>
>> Regards,
>> Roman
>>
>>
>> On Thu, Jul 2, 2020 at 12:49 PM Si-li Liu  wrote:
>>
>>> Rocksdb backend has the same problem
>>>
>>> Khachatryan Roman  于2020年7月2日周四 下午6:11写道:
>>>
>>>> Thanks for reporting this.
>>>>
>>>> Looks like the window namespace was replaced by VoidNamespace in state
>>>> entry.
>>>> I've created https://issues.apache.org/jira/browse/FLINK-18464 to
>>>> further investigate it.
>>>>
>>>> Regards,
>>>> Roman
>>>>
>>>>
>>>> On Thu, Jul 2, 2020 at 10:52 AM Si-li Liu  wrote:
>>>>
>>>>> I'm using flink 1.9 on Mesos and I try to use my own trigger and
>>>>> evictor. The state is stored to memory.
>>>>>
>>>>> input.setParallelism(processParallelism)
>>>>> .assignTimestampsAndWatermarks(new UETimeAssigner)
>>>>> .keyBy(_.key)
>>>>> .window(TumblingEventTimeWindows.of(Time.minutes(20)))
>>>>> .trigger(new MyTrigger)
>>>>> .evictor(new MyEvictor)
>>>>> .process(new MyFunction).setParallelism(aggregateParallelism)
>>>>> .addSink(kafkaSink).setParallelism(sinkParallelism)
>>>>> .name("kafka-record-sink")
>>>>>
>>>>> And the exception stack is here, could anyone help with this? Thanks!
>>>>>
>>>>> java.lang.Exception: Could not materialize checkpoint 1 for operator
>>>>> Window(TumblingEventTimeWindows(120), JoinTrigger, JoinEvictor,
>>>>> ScalaProcessWindowFunctionWrapper) -> Sink: kafka-record-sink (2/5).
>>>>> at org.apache.flink.streaming.runtime.tasks.
>>>>> StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask
>>>>> .java:1100)
>>>>> at org.apache.flink.streaming.runtime.tasks.
>>>>> StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1042)
>>>>> at java.util.concurrent.ThreadPoolExecutor.runWorker(
>>>>> ThreadPoolExecutor.java:1149)
>>>>> at java.util.concurrent.ThreadPoolExecutor$Worker.run(
>>>>> ThreadPoolExecutor.java:624)
>>>>> at java.lang.Thread.run(Thread.java:748)
>>>>> Caused by: java.util.concurrent.ExecutionException: java.lang.
>>>>> ClassCastException: org.apache.flink.streaming.api.windowing.windows.
>>>>> TimeWindow cannot be cast to org.apache.flink.runtime.state.
>>>>> VoidNamespace
>>>>> at java.util.concurrent.FutureTask.report(FutureTask.java:122)
>>>>> at java.util.concurrent.FutureTask.get(FutureTask.java:192)
>>>>> at org.apache.flink.runtime.concurrent.FutureUtils
>>>>> .runIfNotDoneAndGet(FutureUtils.java:450)
>>>>> at org.apache.flink.streaming.api.operators.
>>>>> OperatorSnapshotFinalizer.(OperatorSnapshotFinalizer.java:47)
>>>>> at org.apache.flink.streaming.runtime.tasks.
>>>>> StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1011)
>>>>> ... 3 more
>>>>> Caused by: java.lang.ClassCastException:
>>>>> org.apache.flink.streaming.api.windowing.windows.TimeWindow cannot be
>>>>> cast to org.apache.flink.runtime.state.VoidNamespace
>>>>> at org.apache.flink.runtime.state.VoidNamespaceSerializer
>>>>> .serialize(VoidNamespaceSerializer.java:32)
>>>>> at org.apache.flink.runtime.state.heap.CopyOnWriteStateMapSnapshot
>>>>> .writeState(CopyOnWriteStateMapSnapshot.java:114)
>>>>> at org.apache.flink.runtime.state.heap.AbstractStateTableSnapshot
>>>>> .writeStateInKeyGroup(AbstractStateTableSnapshot.java:121)
>>>>> at org.apache.flink.runtime.state.heap.
>>>>> CopyOnWriteStateTableSnapshot.writeStateInKeyGroup(
>>>>> CopyOnWriteStateTableSnapshot.java:37)
>>>>> at org.apache.flink.runtime.state.heap.HeapSnapshotStrategy$1
>>>>> .callInternal(HeapSnapshotStrategy.java:191)
>>>>> at org.apache.flink.runtime.state.heap.HeapSnapshotStrategy$1
>>>>> .callInternal(HeapSnapshotStrategy.java:158)
>>>>> at org.apache.flink.runtime.state.AsyncSnapshotCallable.call(
>>>>> AsyncSnapshotCallable.java:75)
>>>>> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>>>>> at org.apache.flink.runtime.concurrent.FutureUtils
>>>>> .runIfNotDoneAndGet(FutureUtils.java:447)
>>>>> ... 5 more
>>>>>
>>>>> --
>>>>> Best regards
>>>>>
>>>>> Sili Liu
>>>>>
>>>>
>>>
>>> --
>>> Best regards
>>>
>>> Sili Liu
>>>
>>
>
> --
> Best regards
>
> Sili Liu
>


Re: Heartbeat of TaskManager timed out.

2020-07-02 Thread Khachatryan Roman
Thanks, Ori

>From the log, it looks like there IS a memory leak.

At 10:12:53 there was the last "successfull" gc when 13Gb freed in
0.4653809 secs:
[Eden: 17336.0M(17336.0M)->0.0B(2544.0M) Survivors: 40960.0K->2176.0M Heap:
23280.3M(28960.0M)->10047.0M(28960.0M)]

Then the heap grew from 10G to 28G with GC not being able to free up enough
space:
[Eden: 2544.0M(2544.0M)->0.0B(856.0M) Survivors: 2176.0M->592.0M Heap:
12591.0M(28960.0M)->11247.0M(28960.0M)]
[Eden: 856.0M(856.0M)->0.0B(1264.0M) Survivors: 592.0M->184.0M Heap:
12103.0M(28960.0M)->11655.0M(28960.0M)]
[Eden: 1264.0M(1264.0M)->0.0B(1264.0M) Survivors: 184.0M->184.0M Heap:
12929.0M(28960.0M)->12467.0M(28960.0M)]
... ...
[Eden: 1264.0M(1264.0M)->0.0B(1264.0M) Survivors: 184.0M->184.0M Heap:
28042.6M(28960.0M)->27220.6M(28960.0M)]
[Eden: 1264.0M(1264.0M)->0.0B(1264.0M) Survivors: 184.0M->184.0M Heap:
28494.5M(28960.0M)->28720.6M(28960.0M)]
[Eden: 224.0M(1264.0M)->0.0B(1448.0M) Survivors: 184.0M->0.0B Heap:
28944.6M(28960.0M)->28944.6M(28960.0M)]

Until 10:15:12 when GC freed almost 4G - but it took 51 seconds and
heartbeat timed out:
2020-07-01T10:15:12.869+: [Full GC (Allocation Failure)
 28944M->26018M(28960M), 51.5256128 secs]
  [Eden: 0.0B(1448.0M)->0.0B(1448.0M) Survivors: 0.0B->0.0B Heap:
28944.6M(28960.0M)->26018.9M(28960.0M)], [Metaspace:
113556K->112729K(1150976K)]
  [Times: user=91.08 sys=0.06, real=51.53 secs]
2020-07-01T10:16:04.395+: [GC concurrent-mark-abort]
10:16:04.398 [flink-akka.actor.default-dispatcher-21] INFO
 org.apache.flink.runtime.taskexecutor.TaskExecutor  - The heartbeat of
JobManager with id bc59ba6a

No substantial amount memory was freed after that.

If this memory usage pattern is expected, I'd suggest to:
1. increase heap size
2. play with PrintStringDeduplicationStatistics and UseStringDeduplication
flags - probably string deduplication is making G1 slower then CMS

Regards,
Roman


On Thu, Jul 2, 2020 at 10:11 AM Ori Popowski  wrote:

> Hi,
>
> I'd be happy to :) Attached is a TaskManager log which timed out.
>
>
> Thanks!
>
> On Thu, Jul 2, 2020 at 4:21 AM Xintong Song  wrote:
>
>> Maybe you can share the log and gc-log of the problematic TaskManager?
>> See if we can find any clue.
>>
>> Thank you~
>>
>> Xintong Song
>>
>>
>>
>> On Wed, Jul 1, 2020 at 8:11 PM Ori Popowski  wrote:
>>
>>> I've found out that sometimes one of my TaskManagers experiences a GC
>>> pause of 40-50 seconds and I have no idea why.
>>> I profiled one of the machines using JProfiler and everything looks
>>> fine. No memory leaks, memory is low.
>>> However, I cannot anticipate which of the machines will get the 40-50
>>> seconds pause and I also cannot profile all of them all the time.
>>>
>>> Any suggestions?
>>>
>>> On Mon, Jun 29, 2020 at 4:44 AM Xintong Song 
>>> wrote:
>>>
 In Flink 1.10, there's a huge change in the memory management compared
 to previous versions. This could be related to your observations, because
 with the same configurations, it is possible that there's less JVM heap
 space (with more off-heap memory). Please take a look at this migration
 guide [1].

 Thank you~

 Xintong Song


 [1]
 https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/memory/mem_migration.html

 On Sun, Jun 28, 2020 at 10:12 PM Ori Popowski 
 wrote:

> Thanks for the suggestions!
>
> > i recently tried 1.10 and see this error frequently. and i dont have
> the same issue when running with 1.9.1
> I did downgrade to Flink 1.9 and there's certainly no change in the
> occurrences in the heartbeat timeout
>
>
> >
>
>- Probably the most straightforward way is to try increasing the
>timeout to see if that helps. You can leverage the configuration option
>`heartbeat.timeout`[1]. The default is 50s.
>- It might be helpful to share your configuration setups (e.g.,
>the TM resources, JVM parameters, timeout, etc.). Maybe the easiest 
> way is
>to share the beginning part of your JM/TM logs, including the JVM
>parameters and all the loaded configurations.
>- You may want to look into the GC logs in addition to the
>metrics. In case of a CMS GC stop-the-world, you may not be able to 
> see the
>most recent metrics due to the process not responding to the metric
>querying services.
>- You may also look into the status of the JM process. If JM is
>under significant GC pressure, it could also happen that the heartbeat
>message from TM is not timely handled before the timeout check.
>- Is there any metrics monitoring the network condition between
>the JM and timeouted TM? Possibly any jitters?
>
>
> Weirdly enough, I did manage to find a problem with the timed out
> TaskManagers, which slipped away the last time I checked: The timed out
> TaskManager is always the one with the 

Re: Checkpoint failed because of TimeWindow cannot be cast to VoidNamespace

2020-07-02 Thread Khachatryan Roman
Thanks for the clarification.

Can you also share the code of other parts, particularly MyFunction?

Regards,
Roman


On Thu, Jul 2, 2020 at 12:49 PM Si-li Liu  wrote:

> Rocksdb backend has the same problem
>
> Khachatryan Roman  于2020年7月2日周四 下午6:11写道:
>
>> Thanks for reporting this.
>>
>> Looks like the window namespace was replaced by VoidNamespace in state
>> entry.
>> I've created https://issues.apache.org/jira/browse/FLINK-18464 to
>> further investigate it.
>>
>> Regards,
>> Roman
>>
>>
>> On Thu, Jul 2, 2020 at 10:52 AM Si-li Liu  wrote:
>>
>>> I'm using flink 1.9 on Mesos and I try to use my own trigger and
>>> evictor. The state is stored to memory.
>>>
>>> input.setParallelism(processParallelism)
>>> .assignTimestampsAndWatermarks(new UETimeAssigner)
>>> .keyBy(_.key)
>>> .window(TumblingEventTimeWindows.of(Time.minutes(20)))
>>> .trigger(new MyTrigger)
>>> .evictor(new MyEvictor)
>>> .process(new MyFunction).setParallelism(aggregateParallelism)
>>> .addSink(kafkaSink).setParallelism(sinkParallelism)
>>> .name("kafka-record-sink")
>>>
>>> And the exception stack is here, could anyone help with this? Thanks!
>>>
>>> java.lang.Exception: Could not materialize checkpoint 1 for operator
>>> Window(TumblingEventTimeWindows(120), JoinTrigger, JoinEvictor,
>>> ScalaProcessWindowFunctionWrapper) -> Sink: kafka-record-sink (2/5).
>>> at org.apache.flink.streaming.runtime.tasks.
>>> StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask
>>> .java:1100)
>>> at org.apache.flink.streaming.runtime.tasks.
>>> StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1042)
>>> at java.util.concurrent.ThreadPoolExecutor.runWorker(
>>> ThreadPoolExecutor.java:1149)
>>> at java.util.concurrent.ThreadPoolExecutor$Worker.run(
>>> ThreadPoolExecutor.java:624)
>>> at java.lang.Thread.run(Thread.java:748)
>>> Caused by: java.util.concurrent.ExecutionException: java.lang.
>>> ClassCastException: org.apache.flink.streaming.api.windowing.windows.
>>> TimeWindow cannot be cast to org.apache.flink.runtime.state.
>>> VoidNamespace
>>> at java.util.concurrent.FutureTask.report(FutureTask.java:122)
>>> at java.util.concurrent.FutureTask.get(FutureTask.java:192)
>>> at org.apache.flink.runtime.concurrent.FutureUtils
>>> .runIfNotDoneAndGet(FutureUtils.java:450)
>>> at org.apache.flink.streaming.api.operators.
>>> OperatorSnapshotFinalizer.(OperatorSnapshotFinalizer.java:47)
>>> at org.apache.flink.streaming.runtime.tasks.
>>> StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1011)
>>> ... 3 more
>>> Caused by: java.lang.ClassCastException:
>>> org.apache.flink.streaming.api.windowing.windows.TimeWindow cannot be
>>> cast to org.apache.flink.runtime.state.VoidNamespace
>>> at org.apache.flink.runtime.state.VoidNamespaceSerializer.serialize(
>>> VoidNamespaceSerializer.java:32)
>>> at org.apache.flink.runtime.state.heap.CopyOnWriteStateMapSnapshot
>>> .writeState(CopyOnWriteStateMapSnapshot.java:114)
>>> at org.apache.flink.runtime.state.heap.AbstractStateTableSnapshot
>>> .writeStateInKeyGroup(AbstractStateTableSnapshot.java:121)
>>> at org.apache.flink.runtime.state.heap.CopyOnWriteStateTableSnapshot
>>> .writeStateInKeyGroup(CopyOnWriteStateTableSnapshot.java:37)
>>> at org.apache.flink.runtime.state.heap.HeapSnapshotStrategy$1
>>> .callInternal(HeapSnapshotStrategy.java:191)
>>> at org.apache.flink.runtime.state.heap.HeapSnapshotStrategy$1
>>> .callInternal(HeapSnapshotStrategy.java:158)
>>> at org.apache.flink.runtime.state.AsyncSnapshotCallable.call(
>>> AsyncSnapshotCallable.java:75)
>>> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>>> at org.apache.flink.runtime.concurrent.FutureUtils
>>> .runIfNotDoneAndGet(FutureUtils.java:447)
>>> ... 5 more
>>>
>>> --
>>> Best regards
>>>
>>> Sili Liu
>>>
>>
>
> --
> Best regards
>
> Sili Liu
>


Re: Checkpoint failed because of TimeWindow cannot be cast to VoidNamespace

2020-07-02 Thread Khachatryan Roman
Thanks for reporting this.

Looks like the window namespace was replaced by VoidNamespace in state
entry.
I've created https://issues.apache.org/jira/browse/FLINK-18464 to further
investigate it.

Regards,
Roman


On Thu, Jul 2, 2020 at 10:52 AM Si-li Liu  wrote:

> I'm using flink 1.9 on Mesos and I try to use my own trigger and evictor.
> The state is stored to memory.
>
> input.setParallelism(processParallelism)
> .assignTimestampsAndWatermarks(new UETimeAssigner)
> .keyBy(_.key)
> .window(TumblingEventTimeWindows.of(Time.minutes(20)))
> .trigger(new MyTrigger)
> .evictor(new MyEvictor)
> .process(new MyFunction).setParallelism(aggregateParallelism)
> .addSink(kafkaSink).setParallelism(sinkParallelism)
> .name("kafka-record-sink")
>
> And the exception stack is here, could anyone help with this? Thanks!
>
> java.lang.Exception: Could not materialize checkpoint 1 for operator
> Window(TumblingEventTimeWindows(120), JoinTrigger, JoinEvictor,
> ScalaProcessWindowFunctionWrapper) -> Sink: kafka-record-sink (2/5).
> at org.apache.flink.streaming.runtime.tasks.
> StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask
> .java:1100)
> at org.apache.flink.streaming.runtime.tasks.
> StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1042)
> at java.util.concurrent.ThreadPoolExecutor.runWorker(
> ThreadPoolExecutor.java:1149)
> at java.util.concurrent.ThreadPoolExecutor$Worker.run(
> ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.util.concurrent.ExecutionException: java.lang.
> ClassCastException: org.apache.flink.streaming.api.windowing.windows.
> TimeWindow cannot be cast to org.apache.flink.runtime.state.VoidNamespace
> at java.util.concurrent.FutureTask.report(FutureTask.java:122)
> at java.util.concurrent.FutureTask.get(FutureTask.java:192)
> at org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(
> FutureUtils.java:450)
> at org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer
> .(OperatorSnapshotFinalizer.java:47)
> at org.apache.flink.streaming.runtime.tasks.
> StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1011)
> ... 3 more
> Caused by: java.lang.ClassCastException:
> org.apache.flink.streaming.api.windowing.windows.TimeWindow cannot be cast
> to org.apache.flink.runtime.state.VoidNamespace
> at org.apache.flink.runtime.state.VoidNamespaceSerializer.serialize(
> VoidNamespaceSerializer.java:32)
> at org.apache.flink.runtime.state.heap.CopyOnWriteStateMapSnapshot
> .writeState(CopyOnWriteStateMapSnapshot.java:114)
> at org.apache.flink.runtime.state.heap.AbstractStateTableSnapshot
> .writeStateInKeyGroup(AbstractStateTableSnapshot.java:121)
> at org.apache.flink.runtime.state.heap.CopyOnWriteStateTableSnapshot
> .writeStateInKeyGroup(CopyOnWriteStateTableSnapshot.java:37)
> at org.apache.flink.runtime.state.heap.HeapSnapshotStrategy$1
> .callInternal(HeapSnapshotStrategy.java:191)
> at org.apache.flink.runtime.state.heap.HeapSnapshotStrategy$1
> .callInternal(HeapSnapshotStrategy.java:158)
> at org.apache.flink.runtime.state.AsyncSnapshotCallable.call(
> AsyncSnapshotCallable.java:75)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(
> FutureUtils.java:447)
> ... 5 more
>
> --
> Best regards
>
> Sili Liu
>


Re: Unable to run basic WordCount example program from flink tutorial

2020-06-02 Thread Khachatryan Roman
Dear Tom,

This is likely a scala version issue.
Can you post your pom.xml?

Regards,
Roman


On Tue, Jun 2, 2020 at 6:34 PM Tom Burgert  wrote:

> Dear all,
>
> I am trying to set up flink and after hours I still fail to make a simple
> program run even though I follow every recommended step in the tutorials.
>
> My operating system is OSX (therefore everything was installed via brew)
> and I am using Maven as a build tool. I used the quick start script for
> scala to set up a new project.
> Then, I only did two things:
> 1) paced the code from word count object to the BatchJob.scala file (
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/batch/)
> 2) alternated the MainClass in the pom.xml file from StreamJob to BatchJob
>
> Then, I build via "mvn clean package" and run the .jar. file.
>
> When compiling, I receive the following warning:
>
> 
> Expected all dependencies to require Scala version: 2.11.12
>  org.scala-lang:scala-reflect:2.11.12 requires scala version: 2.11.12
>  org.apache.flink:flink-scala_2.11:1.10.0 requires scala version: 2.11.12
>  org.apache.flink:flink-scala_2.11:1.10.0 requires scala version: 2.11.12
>  org.scala-lang:scala-compiler:2.11.12 requires scala version: 2.11.12
>  org.scala-lang.modules:scala-xml_2.11:1.0.5 requires scala version: 2.11.7
> Multiple versions of scala libraries detected!
> 
>
> Which is weird, since the pom.xml file should keep all versions the same
> (at least the structures seems like it) and by playing around with the
> pom.xml file I could not prevent this warning. Either way, I could not find
> any information on the internet if the warning might be the reason for the
> error I am getting when running the jar file.
>
> The error I get when I run my .jar file is the following:
>
> 
>  The program finished with the following exception:
>
> org.apache.flink.client.program.ProgramInvocationException: The main
> method caused an error:
> org.apache.flink.client.program.ProgramInvocationException: Job failed
> (JobID: 32d249e2b67ef0ecbc72bd164fe1dfb9)
> at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:335)
> at
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205)
> at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:138)
> at
> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:662)
> at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:210)
> at
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:893)
> at
> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:966)
> at
> org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
> at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:966)
> Caused by: java.util.concurrent.ExecutionException:
> org.apache.flink.client.program.ProgramInvocationException: Job failed
> (JobID: 32d249e2b67ef0ecbc72bd164fe1dfb9)
> at
> java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
> at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
> at
> org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:73)
> at
> org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:844)
> at org.apache.flink.api.java.DataSet.collect(DataSet.java:413)
> at org.apache.flink.api.java.DataSet.print(DataSet.java:1652)
> at org.apache.flink.api.scala.DataSet.print(DataSet.scala:1864)
> at org.myorg.quickstart.BatchJob$.main(BatchJob.scala:19)
> at org.myorg.quickstart.BatchJob.main(BatchJob.scala)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:321)
> ... 8 more
> Caused by: org.apache.flink.client.program.ProgramInvocationException: Job
> failed (JobID: 32d249e2b67ef0ecbc72bd164fe1dfb9)
> at
> org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:112)
> at
> java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:602)
> at
> java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577)
> at
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
> at
> java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962)
> at
> org.apache.flink.client.program.rest.RestClusterClient.lambda$pollResourceAsync$21(RestClusterClient.java:565)
> 

Re: User / Job Manager (permissions) for Flink

2020-06-02 Thread Khachatryan Roman
Hi David,

One option is Ververica Platform which has a notion of Namespaces:
https://docs.ververica.com/administration/namespaces.html

I guess Konstantin can tell you more about it.

Disclaimer: I work for a company that develops this product.

Regards,
Roman


On Tue, Jun 2, 2020 at 5:37 PM David Magalhães 
wrote:

> Hi, not sure if this was discussed (for a brief search I couldn't find
> anything), but I would like to know if there is an application that uses
> Flink REST API to provide some kind of user management, like allow a
> certain user to login and manage some jobs running in the link, limit the
> parallelization for each job, etc.
>
> The idea was for certain users can only access their job, and can
> stop/start their jobs, but can't interfere with other jobs that run on the
> same cluster.
>
> Thanks
>


Re: Connection unexpectedly closed by remote task manager

2020-06-02 Thread Khachatryan Roman
Hi,

Can you check if there are any failures on task manager mention in error
message (ip-10-210-5-104.ap-south-1.compute.internal/10.210.5.104:42317)?

Regards,
Roman


On Tue, Jun 2, 2020 at 10:18 AM ApoorvK 
wrote:

> I have a flink job running on version 1.8.2 with parallelism of 12, I took
> the savepoint of the application on disk and it is of approx 70GB, now when
> I running the application from this particular savepoint checkpoint keeps
> getting failed and app could not restart. I am getting following error :
> Kindly let me know what can I do to get this fix
>
> org.apache.flink.runtime.io
> .network.netty.exception.RemoteTransportException:
> Connection unexpectedly closed by remote task manager
> 'ip-10-210-5-104.ap-south-1.compute.internal/10.210.5.104:42317'. This
> might
> indicate that the remote task manager was lost.
> at
> org.apache.flink.runtime.io
> .network.netty.CreditBasedPartitionRequestClientHandler.channelInactive(CreditBasedPartitionRequestClientHandler.java:136)
> at
>
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:245)
> at
>
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:231)
> at
>
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:224)
> at
>
> org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.channelInputClosed(ByteToMessageDecoder.java:390)
> at
>
> org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.channelInactive(ByteToMessageDecoder.java:355)
> at
>
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:245)
> at
>
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:231)
> at
>
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:224)
> at
>
> org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline$HeadContext.channelInactive(DefaultChannelPipeline.java:1429)
> at
>
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:245)
> at
>
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:231)
> at
>
> org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline.fireChannelInactive(DefaultChannelPipeline.java:947)
> at
>
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AbstractUnsafe$8.run(AbstractChannel.java:826)
> at
>
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163)
> at
>
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:404)
> at
>
> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:474)
> at
>
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:909)
> at java.lang.Thread.run(Thread.java:745)
> final-order-object-map -> Sink: elastic-search-sink (5/12)
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


Re: Rest Api body size

2020-06-02 Thread Khachatryan Roman
Hi White,

Did you try to increase rest.client.max-content-length [1]?

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/config.html#advanced-options-for-the-rest-endpoint-and-client

Regards,
Roman


On Mon, Jun 1, 2020 at 8:01 AM snack white  wrote:

> Hi,
> When I using rest api submit job , my post body size is 25.83kb
> (my jar has being uploaded to job manager before ) , my body was cut off in
> flink job manager, can someone tell me how to modify the post body length
> limit .
> Thanks,
> White


Re: Need suggestion on Flink-Kafka stream processing design

2020-05-12 Thread Khachatryan Roman
Hello Hemant,

Thanks for your reply.

I think partitioning you described (event type + protocol type) is subject
to data skew. Including a device ID should solve this problem.
Also, including "protocol_type" into the key and having topic per
protocol_type seems redundant.

Furthermore, do you have any particular reason to maintain multiple topics?
I could imagine protocols have different speeds or other characteristics,
so you can tune Flink accordingly.
Otherwise, having a single topic partitioned only by device ID would
simplify deployment and reduce data skew.

> By consume do you mean the downstream system?
Yes.

Regards,
Roman


On Mon, May 11, 2020 at 11:30 PM hemant singh  wrote:

> Hello Roman,
>
> PFB my response -
>
> As I understand, each protocol has a distinct set of event types (where
> event type == metrics type); and a distinct set of devices. Is this correct?
> Yes, correct. distinct events and devices. Each device emits these event.
>
> > Based on data protocol I have 4-5 topics. Currently the data for a
> single event is being pushed to a partition of the kafka topic(producer key
> -> event_type + data_protocol).
> Here you are talking about the source (to Flink job), right?
> Yes, you are right.
>
> Can you also share how are you going to consume these data?
> By consume do you mean the downstream system?
> If yes then this data will be written to a DB, some metrics goes to
> TSDB(Influx) as well.
>
> Thanks,
> Hemant
>
> On Tue, May 12, 2020 at 2:08 AM Khachatryan Roman <
> khachatryan.ro...@gmail.com> wrote:
>
>> Hi Hemant,
>>
>> As I understand, each protocol has a distinct set of event types (where
>> event type == metrics type); and a distinct set of devices. Is this correct?
>>
>> > Based on data protocol I have 4-5 topics. Currently the data for a
>> single event is being pushed to a partition of the kafka topic(producer key
>> -> event_type + data_protocol).
>> Here you are talking about the source (to Flink job), right?
>>
>> Can you also share how are you going to consume these data?
>>
>>
>> Regards,
>> Roman
>>
>>
>> On Mon, May 11, 2020 at 8:57 PM hemant singh 
>> wrote:
>>
>>> Hi,
>>>
>>> I have different events from a device which constitutes different
>>> metrics for same device. Each of these event is produced by the device in
>>> interval of few milli seconds to a minute.
>>>
>>> Event1(Device1) -> Stream1 -> Metric 1
>>> Event2 (Device1) -> Stream2 -> Metric 2 ...
>>> ..
>>> ...
>>> Event100(Device1) -> Stream100 -> Metric100
>>>
>>> The number of events can go up to few 100s for each data protocol and we
>>> have around 4-5 data protocols. Metrics from different streams makes up a
>>> records
>>> like for example from above example for device 1 -
>>>
>>> Device1 -> Metric1, Metric 2, Metric15 forms a single record for the
>>> device. Currently in development phase I am using interval join to achieve
>>> this, that is to create a record with latest data from different
>>> streams(events).
>>>
>>> Based on data protocol I have 4-5 topics. Currently the data for a
>>> single event is being pushed to a partition of the kafka topic(producer key
>>> -> event_type + data_protocol). So essentially one topic is made up of many
>>> streams. I am filtering on the key to define the streams.
>>>
>>> My question is - Is this correct way to stream the data, I had thought
>>> of maintaining different topic for an event, however in that case number of
>>> topics could go to few thousands and that is something which becomes little
>>> challenging to maintain and not sure if kafka handles that well.
>>>
>>> I know there are traditional ways to do this like pushing it to
>>> timeseries db and then joining data for different metric but that is
>>> something which will never scale, also this processing should be as
>>> realtime as possible.
>>>
>>> Are there better ways to handle this use case or I am on correct path.
>>>
>>> Thanks,
>>> Hemant
>>>
>>


Re: Broadcast state vs data enrichment

2020-05-12 Thread Khachatryan Roman
Thanks for the clarification.

Apparently, the second option (with enricher) creates more load by adding
configuration to every event. Unless events are much bigger than the
configuration, this will significantly increase network, memory, and CPU
usage.
Btw, I think you don't need a broadcast in the 2nd option, since the
interested subtask will receive the configuration anyways.

Regards,
Roman


On Tue, May 12, 2020 at 5:57 AM Manas Kale  wrote:

> Sure. Apologies for not making this clear enough.
>
> > each operator only stores what it needs.
> Lets imagine this setup :
>
> BROADCAST STREAM
> config-stream 
> 
> |   | 
>  |
> event-stream--> operator1--> operator2-> 
> operator3
>
>
> In this scenario, all 3 operators will be BroadcastProcessFunctions. Each
> of them will receive the whole config message in their
> processBroadcastElement method, but each one will only store what it
> needs in their state store. So even though operator1 will receive
>  config = {
> "config1" : 1,
> "config2" : 2,
> "config3" : 3
> }
> it will only store config1.
>
> > each downstream operator will "strip off" the config parameter that it
> needs.
>
> BROADCAST STREAM
> config-stream -
>   |
> event-stream-->  enricher --> 
> operator1--> operator2-> operator3
>
> In this case, the enricher operator will store the whole config message.
> When an event message arrives, this operator will append config1, config2
> and config3 to it. Operator 1 will extract and use config1, and output a
> message that has config1 stripped off.
>
> I hope that helps!
>
> Perhaps I am being too pedantic but I would like to know if these two
> methods have comparable performance differences and if so which one would
> be preferred.
>
>
>
>
> On Mon, May 11, 2020 at 11:46 PM Khachatryan Roman <
> khachatryan.ro...@gmail.com> wrote:
>
>> Hi Manas,
>>
>> The approaches you described looks the same:
>> > each operator only stores what it needs.
>> > each downstream operator will "strip off" the config parameter that it
>> needs.
>>
>> Can you please explain the difference?
>>
>> Regards,
>> Roman
>>
>>
>> On Mon, May 11, 2020 at 8:07 AM Manas Kale  wrote:
>>
>>> Hi,
>>> I have a single broadcast message that contains configuration data
>>> consumed by different operators. For eg:
>>> config = {
>>> "config1" : 1,
>>> "config2" : 2,
>>> "config3" : 3
>>> }
>>>
>>> Operator 1 will consume config1 only, operator 2 will consume config2
>>> only etc.
>>>
>>>
>>>- Right now in my implementation the config message gets broadcast
>>>over operators 1,2,3 and each operator only stores what it needs.
>>>
>>>
>>>- A different approach would be to broadcast the config message to a
>>>single root operator. This will then enrich event data flowing through it
>>>with config1,config2 and config3 and each downstream operator will "strip
>>>off" the config parameter that it needs.
>>>
>>>
>>> *I was wondering which approach would be the best to go with performance
>>> wise. *I don't really have the time to implement both and compare, so
>>> perhaps someone here already knows if one approach is better or both
>>> provide similar performance.
>>>
>>> FWIW, the config stream is very sporadic compared to the event stream.
>>>
>>> Thank you,
>>> Manas Kale
>>>
>>>
>>>
>>>


Re: Not able to implement an usecase

2020-05-12 Thread Khachatryan Roman
AFAIK, yes, you can write streams.

I'm pulling in Jingsong Li and Rui Li as they might know better.

Regards,
Roman


On Mon, May 11, 2020 at 10:21 PM Jaswin Shah 
wrote:

> If I go with table apis, can I write the streams to hive or it is only for
> batch processing as of now.
>
> Get Outlook for Android <https://aka.ms/ghei36>
>
> ------
> *From:* Khachatryan Roman 
> *Sent:* Tuesday, May 12, 2020 1:49:10 AM
> *To:* Jaswin Shah 
> *Cc:* user@flink.apache.org 
> *Subject:* Re: Not able to implement an usecase
>
> Hi Jaswin,
>
> Currently, DataStream API doesn't support outer joins.
> As a workaround, you can use coGroup function [1].
>
> Hive is also not supported by DataStream API though it's supported by
> Table API [2].
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/api/common/functions/CoGroupFunction.html
> [2]
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/hive/read_write_hive.html
>
> Regards,
> Roman
>
>
> On Mon, May 11, 2020 at 6:03 PM Jaswin Shah 
> wrote:
>
> Hi,
> I want to implement the below use case in my application:
> I am doing an interval join between two data streams and then, in process
> function catching up the discrepant results on joining. Joining is done on
> key orderId. Now, I want to identify all the messages in both datastreams
> which are not joined. Means, for a message in left stream if I do not
> find any message in right stream over the interval defined, then, that
> message should be caught and same for right stream if there are messages
> which do not have corresponding messages in left streams then, catch
> them.Need an help how can I achieve the use case. I know this can be done
> with outer join but interval join or tumbling event time window joins only
> support inner join as per my knowledge. I do not want to use table/sql api
> here but want to work on this datastream apis only.
>
> Currently I am using this which is working for 90 % of the cases but 10 %
> of the cases where large large delay can happen and messages in left or
> right streams are missing are not getting supported with my this
> implementaions:
>
> /**
>  * Join cart and pg streams on mid and orderId, and the interval specified.
>  *
>  * @param leftStream
>  * @param rightStream
>  * @return
>  */
> public SingleOutputStreamOperator 
> intervalJoinCartAndPGStreams(DataStream leftStream, 
> DataStream rightStream, ParameterTool parameter) {
> //Descripant results are sent to kafka from CartPGProcessFunction.
> return leftStream
> .keyBy(new CartJoinColumnsSelector())
> .intervalJoin(rightStream.keyBy(new PGJoinColumnsSelector()))
> 
> .between(Time.milliseconds(Long.parseLong(parameter.get(Constants.INTERVAL_JOIN_LOWERBOUND))),
>  
> Time.milliseconds(Long.parseLong(parameter.get(Constants.INTERVAL_JOIN_UPPERBOUND
> .process(new CartPGProcessFunction());
>
> }
>
>
>
> Secondly, I am unable to find the streaming support to stream out the
> datastreams I am reading from kafka to hive which I want to batch process
> with Flink
>
> Please help me on resolving this use cases.
>
> Thanks,
> Jaswin
>
>
> Get Outlook for Android <https://aka.ms/ghei36>
>
>


Re: Need suggestion on Flink-Kafka stream processing design

2020-05-11 Thread Khachatryan Roman
Hi Hemant,

As I understand, each protocol has a distinct set of event types (where
event type == metrics type); and a distinct set of devices. Is this correct?

> Based on data protocol I have 4-5 topics. Currently the data for a single
event is being pushed to a partition of the kafka topic(producer key ->
event_type + data_protocol).
Here you are talking about the source (to Flink job), right?

Can you also share how are you going to consume these data?


Regards,
Roman


On Mon, May 11, 2020 at 8:57 PM hemant singh  wrote:

> Hi,
>
> I have different events from a device which constitutes different metrics
> for same device. Each of these event is produced by the device in
> interval of few milli seconds to a minute.
>
> Event1(Device1) -> Stream1 -> Metric 1
> Event2 (Device1) -> Stream2 -> Metric 2 ...
> ..
> ...
> Event100(Device1) -> Stream100 -> Metric100
>
> The number of events can go up to few 100s for each data protocol and we
> have around 4-5 data protocols. Metrics from different streams makes up a
> records
> like for example from above example for device 1 -
>
> Device1 -> Metric1, Metric 2, Metric15 forms a single record for the
> device. Currently in development phase I am using interval join to achieve
> this, that is to create a record with latest data from different
> streams(events).
>
> Based on data protocol I have 4-5 topics. Currently the data for a single
> event is being pushed to a partition of the kafka topic(producer key ->
> event_type + data_protocol). So essentially one topic is made up of many
> streams. I am filtering on the key to define the streams.
>
> My question is - Is this correct way to stream the data, I had thought of
> maintaining different topic for an event, however in that case number of
> topics could go to few thousands and that is something which becomes little
> challenging to maintain and not sure if kafka handles that well.
>
> I know there are traditional ways to do this like pushing it to
> timeseries db and then joining data for different metric but that is
> something which will never scale, also this processing should be as
> realtime as possible.
>
> Are there better ways to handle this use case or I am on correct path.
>
> Thanks,
> Hemant
>


Re: Not able to implement an usecase

2020-05-11 Thread Khachatryan Roman
Hi Jaswin,

Currently, DataStream API doesn't support outer joins.
As a workaround, you can use coGroup function [1].

Hive is also not supported by DataStream API though it's supported by Table
API [2].

[1]
https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/api/common/functions/CoGroupFunction.html
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/hive/read_write_hive.html

Regards,
Roman


On Mon, May 11, 2020 at 6:03 PM Jaswin Shah  wrote:

> Hi,
> I want to implement the below use case in my application:
> I am doing an interval join between two data streams and then, in process
> function catching up the discrepant results on joining. Joining is done on
> key orderId. Now, I want to identify all the messages in both datastreams
> which are not joined. Means, for a message in left stream if I do not
> find any message in right stream over the interval defined, then, that
> message should be caught and same for right stream if there are messages
> which do not have corresponding messages in left streams then, catch
> them.Need an help how can I achieve the use case. I know this can be done
> with outer join but interval join or tumbling event time window joins only
> support inner join as per my knowledge. I do not want to use table/sql api
> here but want to work on this datastream apis only.
>
> Currently I am using this which is working for 90 % of the cases but 10 %
> of the cases where large large delay can happen and messages in left or
> right streams are missing are not getting supported with my this
> implementaions:
>
> /**
>  * Join cart and pg streams on mid and orderId, and the interval specified.
>  *
>  * @param leftStream
>  * @param rightStream
>  * @return
>  */
> public SingleOutputStreamOperator 
> intervalJoinCartAndPGStreams(DataStream leftStream, 
> DataStream rightStream, ParameterTool parameter) {
> //Descripant results are sent to kafka from CartPGProcessFunction.
> return leftStream
> .keyBy(new CartJoinColumnsSelector())
> .intervalJoin(rightStream.keyBy(new PGJoinColumnsSelector()))
> 
> .between(Time.milliseconds(Long.parseLong(parameter.get(Constants.INTERVAL_JOIN_LOWERBOUND))),
>  
> Time.milliseconds(Long.parseLong(parameter.get(Constants.INTERVAL_JOIN_UPPERBOUND
> .process(new CartPGProcessFunction());
>
> }
>
>
>
> Secondly, I am unable to find the streaming support to stream out the
> datastreams I am reading from kafka to hive which I want to batch process
> with Flink
>
> Please help me on resolving this use cases.
>
> Thanks,
> Jaswin
>
>
> Get Outlook for Android 
>


Re: Flink Memory analyze on AWS EMR

2020-05-11 Thread Khachatryan Roman
Hi Jacky,

Did you try it without  -XX:LogFile=${FLINK_LOG_PREFIX}.jit ?
Probably, Flink can't write to this location.

Also, you can try other tools described at
https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/application_profiling.html

Regards,
Roman


On Mon, May 11, 2020 at 5:02 PM Jacky D  wrote:

> hi, All
>
> I'm encounter a memory issue with my flink job on AWS EMR(current flink
> version 1.6.2) , I would like to find the root cause so I'm trying JITWatch
> on my local standalone cluster but I can not use it on EMR . after I add
> following config on flink-conf.yaml :
>
> env.java.opts: "-XX:+UnlockDiagnosticVMOptions -XX:+TraceClassLoading
> -XX:+LogCompilation -XX:LogFile=${FLINK_LOG_PREFIX}.jit -XX:+PrintAssembly"
>
> I got error
>
> 2020-05-07 16:24:53,368 ERROR
> org.apache.flink.yarn.cli.FlinkYarnSessionCli - Error while
> running the Flink Yarn session.
> java.lang.reflect.UndeclaredThrowableException
> at
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1862)
> at
> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
> at
> org.apache.flink.yarn.cli.FlinkYarnSessionCli.main(FlinkYarnSessionCli.java:813)
> Caused by: org.apache.flink.client.deployment.ClusterDeploymentException:
> Couldn't deploy Yarn session cluster
> at
> org.apache.flink.yarn.AbstractYarnClusterDescriptor.deploySessionCluster(AbstractYarnClusterDescriptor.java:429)
> at
> org.apache.flink.yarn.cli.FlinkYarnSessionCli.run(FlinkYarnSessionCli.java:610)
> at
> org.apache.flink.yarn.cli.FlinkYarnSessionCli.lambda$main$2(FlinkYarnSessionCli.java:813)
> at java.security.AccessController.doPrivileged(Native Method)
> at javax.security.auth.Subject.doAs(Subject.java:422)
> at
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1844)
> ... 2 more
> Caused by:
> org.apache.flink.yarn.AbstractYarnClusterDescriptor$YarnDeploymentException:
> The YARN application unexpectedly switched to state FAILED during
> deployment.
>
> How can I fix this issue to enable JITWatch or which tool will be a proper
> way to analyze flink mem dump on EMR  ?
>
> Thanks
> Jacky Du
>


Re: Testing jobs locally agains secure Hadoop cluster

2020-05-11 Thread Khachatryan Roman
Hi Őrhidi,

Can you please provide some details about the errors you get?

Regards,
Roman


On Mon, May 11, 2020 at 9:32 AM Őrhidi Mátyás 
wrote:

> Dear Community,
>
> I'm having troubles testing jobs against a secure Hadoop cluster. Is that
> possible? The mini cluster seems to not load any security modules.
>
> Thanks,
> Matyas
>


Re: Blink Planner fails to generate RowtimeAttribute for custom TableSource

2020-05-11 Thread Khachatryan Roman
Hi Yuval,

Thanks for reporting this issue. I'm pulling in Timo and Jark who are
working on the SQL component. They might be able to help you with your
problem.

Regards,
Roman


On Mon, May 11, 2020 at 9:10 AM Yuval Itzchakov  wrote:

> Hi,
> While migrating from Flink 1.9 -> 1.10 and from the old planner to Blink,
> I've ran into an issue where the Blink Planner doesn't take into account
> the RowtimeAttribute defined on a custom table source. I've opened an
> issue: https://issues.apache.org/jira/browse/FLINK-17600 and was
> wondering if anyone else ran into this?
>
> --
> Best Regards,
> Yuval Itzchakov.
>


Re: Broadcast state vs data enrichment

2020-05-11 Thread Khachatryan Roman
Hi Manas,

The approaches you described looks the same:
> each operator only stores what it needs.
> each downstream operator will "strip off" the config parameter that it
needs.

Can you please explain the difference?

Regards,
Roman


On Mon, May 11, 2020 at 8:07 AM Manas Kale  wrote:

> Hi,
> I have a single broadcast message that contains configuration data
> consumed by different operators. For eg:
> config = {
> "config1" : 1,
> "config2" : 2,
> "config3" : 3
> }
>
> Operator 1 will consume config1 only, operator 2 will consume config2 only
> etc.
>
>
>- Right now in my implementation the config message gets broadcast
>over operators 1,2,3 and each operator only stores what it needs.
>
>
>- A different approach would be to broadcast the config message to a
>single root operator. This will then enrich event data flowing through it
>with config1,config2 and config3 and each downstream operator will "strip
>off" the config parameter that it needs.
>
>
> *I was wondering which approach would be the best to go with performance
> wise. *I don't really have the time to implement both and compare, so
> perhaps someone here already knows if one approach is better or both
> provide similar performance.
>
> FWIW, the config stream is very sporadic compared to the event stream.
>
> Thank you,
> Manas Kale
>
>
>
>


Re: MongoSink

2020-05-11 Thread Khachatryan Roman
Hi Aissa,

What is BSONWritable you pass from map to sink?
I guess it's not serializable which causes Flink to use kryo, which fails.

Regards,
Roman


On Sun, May 10, 2020 at 10:42 PM Aissa Elaffani 
wrote:

> Hello Guys,
> I am trying to sink my data to MongoDB, But i got some errors. I am
> sharing with you my MongoDB implemetation, and the errors that occurred. I
> hope someone can figure it out.
> Thank you for your time, I really appreciate it.
> Aissa
>


Re: Some question about flink temp files

2020-03-20 Thread Khachatryan Roman
Hi Reo,

Please find the answers to your questions below.

> 1, what is the usage of this tmp files?
These files are used by Flink internally for things like caching state
locally, storing jars and so on. They are not intended for the end-user.
> 2, Is there have any mechanism of flink to manage this files?
Yes, they are created and deleted by Flink when not used.
> 3, Have any situation I need to delete the useless files manually?
No.
> 4, I know I can specify the temp files path by "io.tmp.dirs" config for
flink cluster. But when I use LocalEnvironment, how can I config this path
instead of use the default path("/tmp")?
Please use "java.io.tmpdir" *system property* for standalone deployment [1].

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/config.html

Regards,
Roman


On Fri, Mar 20, 2020 at 11:55 AM Reo Lei  wrote:

> Hi all,
>
> Recently, I found flink tmp files(localState,
> blobStore-*, flink-dist-cach-*, flink-io-*, flink-netty-shuffle-* etc.) has
> been grown to a total of about 6GB size. I have no idea what usage abot
> this files, and How big it will grow.
>
> So, I have some questions about the tmep files of flink as follow:
>
> 1, what is the usage of this tmp files?
> 2, Is there have any mechanism of flink to manage this files?
> 3, Have any situation I need to delete the useless files manually?
> 4, I know I can specify the temp files path by "io.tmp.dirs" config for
> flink cluster. But when I use LocalEnvironment, how can I config this
> path instead of use the default path("/tmp")?
>
> Thanks.
>
> BR,
> Reo
>


Re: Savepoint Location from Flink REST API

2020-03-20 Thread Khachatryan Roman
Hey Aaron,

You can use /jobs/:jobid/savepoints/:triggerid to get the location when the
checkpoint is completed.

Please see
https://ci.apache.org/projects/flink/flink-docs-release-1.10/api/java/index.html?org/apache/flink/runtime/rest/handler/job/savepoints/SavepointHandlers.html

Meanwhile, I've created an issue to update the docs:
https://issues.apache.org/jira/browse/FLINK-16696

Regards,
Roman


On Fri, Mar 20, 2020 at 5:09 AM Aaron Langford 
wrote:

> Hey Flink Community,
>
> I'm combing through docs right now, and I don't see that a savepoint
> location is returned or surfaced anywhere. When I do this in the CLI, I get
> a nice message that tells me where in S3 it put my savepoint (unique
> savepoint ID included). I'm looking for that same result to be available
> via the REST API. Does this exist today?
>
> Aaron
>


Re: Writing retract streams to Kafka

2020-03-05 Thread Khachatryan Roman
Hi Gyula,

Could you provide the code of your Flink program, the error with stacktrace
and the Flink version?

Thanks.,
Roman


On Thu, Mar 5, 2020 at 2:17 PM Gyula Fóra  wrote:

> Hi All!
>
> Excuse my stupid question, I am pretty new to the Table/SQL API and I am
> trying to play around with it implementing and running a few use-cases.
>
> I have a simple window join + aggregation, grouped on some id that I want
> to write to Kafka but I am hitting the following error:
>
> "AppendStreamTableSink requires that Table has only insert changes."
>
> If I understand correctly the problem here is that since updates are
> possible within a single group, we have a retract stream and the Kafka Sink
> cannot handle that. I tried to search for the solution but I haven't found
> any satisfying answers.
>
> How can I simply tell the INSERT logic to ignore previous values and just
> always keep sending the latest (like you would see it on the CLI output).
>
> Thank you!
> Gyula
>


Re: Setting the operator-id to measure percentile latency over several jobs

2020-03-05 Thread Khachatryan Roman
Hi Felipe,

Please find the answers to your questions below.

> Each "operator_subtask_index" means each instance of the parallel
physical operator, doesn't it?
Yes.
> How can I set a fixed ID for the "operator_id" in my code so I can
identify quickly which operator I am measuring?
You are using the correct api (uid(...))
> What is the hash function used so I can identify my operator?
Flink uses
https://guava.dev/releases/18.0/api/docs/com/google/common/hash/Hashing.html#murmur3_128(int)

Regards,
Roman


On Thu, Mar 5, 2020 at 12:45 PM Felipe Gutierrez <
felipe.o.gutier...@gmail.com> wrote:

> Hi community,
>
> I am tracking the latency of operators in Flink according to this
> reference [1]. When I am using Prometheus+Grafana I can issue a query using
> "flink_taskmanager_job_latency_source_id_operator_id_operator_subtask_index_latency"
> and I can check the percentiles of each "operator_id" and each
> "operator_subtask_index". Each "operator_subtask_index" means each instance
> of the parallel physical operator, doesn't it?
>
> How can I set a fixed ID for the "operator_id" in my code so I can
> identify quickly which operator I am measuring? I used "map(new
> MyMapUDF()).uid('my-operator-ID')" but it seems that there is a hash
> function that converts the string to a hash value. What is the hash
> function used so I can identify my operator? I know that I can use the Rest
> API [2] and if I name my operator it will have always the same hash when I
> restart the job, but I would like to set its name.
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/metrics.html#latency-tracking
> [2]
> https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/metrics.html#rest-api-integration
> *-*
> *- Felipe Gutierrez*
>
> *- skype: felipe.o.gutierrez*
> *- **https://felipeogutierrez.blogspot.com
> * *
> *
>


Re: Getting javax.management.InstanceAlreadyExistsException when upgraded to 1.10

2020-02-28 Thread Khachatryan Roman
Hi John,

Sorry for the late reply.

I'd assume that this is a separate issue.
Regarding the original one, I'm pretty sure it's
https://issues.apache.org/jira/browse/FLINK-8093

Regards,
Roman


On Wed, Feb 26, 2020 at 5:50 PM John Smith  wrote:

> Just curious is this the reason why also some jobs in the UI show their
> metrics and others do not?
>
> Looking at 2 jobs, one displays how may bytes in and out it has received.
> While another one show all zeros. But I know it's working though.
>
> On Wed, 26 Feb 2020 at 11:19, John Smith  wrote:
>
>> This is what I got from the logs.
>>
>> 2020-02-25 00:13:38,124 WARN  org.apache.kafka.common.utils.AppInfoParser
>>   - Error registering AppInfo mbean
>> javax.management.InstanceAlreadyExistsException:
>> kafka.consumer:type=app-info,id=consumer-1
>> at com.sun.jmx.mbeanserver.Repository.addMBean(Repository.java:437)
>> at
>> com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerWithRepository(DefaultMBeanServerInterceptor.java:1898)
>> at
>> com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerDynamicMBean(DefaultMBeanServerInterceptor.java:966)
>> at
>> com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerObject(DefaultMBeanServerInterceptor.java:900)
>> at
>> com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerMBean(DefaultMBeanServerInterceptor.java:324)
>> at
>> com.sun.jmx.mbeanserver.JmxMBeanServer.registerMBean(JmxMBeanServer.java:522)
>> at
>> org.apache.kafka.common.utils.AppInfoParser.registerAppInfo(AppInfoParser.java:62)
>> at
>> org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:805)
>> at
>> org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:659)
>> at
>> org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:639)
>> at
>> org.apache.flink.streaming.connectors.kafka.internal.KafkaPartitionDiscoverer.initializeConnections(KafkaPartitionDiscoverer.java:58)
>> at
>> org.apache.flink.streaming.connectors.kafka.internals.AbstractPartitionDiscoverer.open(AbstractPartitionDiscoverer.java:94)
>> at
>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.open(FlinkKafkaConsumerBase.java:505)
>> at
>> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
>> at
>> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:1007)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:454)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:449)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:461)
>> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
>> at java.lang.Thread.run(Thread.java:748)
>>
>>
>>
>>
>> On Tue, 25 Feb 2020 at 15:50, John Smith  wrote:
>>
>>> Ok as soon as I can tomorrow.
>>>
>>> Thanks
>>>
>>> On Tue, 25 Feb 2020 at 11:51, Khachatryan Roman <
>>> khachatryan.ro...@gmail.com> wrote:
>>>
>>>> Hi John,
>>>>
>>>> Seems like this is another instance of
>>>> https://issues.apache.org/jira/browse/FLINK-8093
>>>> Could you please provide the full stacktrace?
>>>>
>>>> Regards,
>>>> Roman
>>>>
>>>>
>>>> On Mon, Feb 24, 2020 at 10:48 PM John Smith 
>>>> wrote:
>>>>
>>>>> Hi. Just upgraded to 1.10.0 And getting the bellow error when I deploy
>>>>> my tasks.
>>>>>
>>>>> The first 1 seems to deploy ok, but subsequent ones seem to this throw
>>>>> this error. But The seem to work still.
>>>>>
>>>>> javax.management.InstanceAlreadyExistsException:
>>>>> kafka.consumer:type=app-info,id=consumer-2
>>>>> at com.sun.jmx.mbeanserver.Repository.addMBean(Repository.java:437)
>>>>> at
>>>>> com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerWithRepository(DefaultMBeanServerInterceptor.java:1898)
>>>>> at
>>>>> com.sun.jmx.interceptor

Re: state schema evolution for case classes

2020-02-26 Thread Khachatryan Roman
Hi Apoorv,

You can achieve this by implementing custom serializers for your state.
Please refer to
https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/stream/state/custom_serialization.html

Regards,
Roman


On Wed, Feb 26, 2020 at 6:53 AM Apoorv Upadhyay <
apoorv.upadh...@razorpay.com> wrote:

> Hi Roman,
>
> I have successfully migrated to flink 1.8.2 with the savepoint created by
> flink 1.6.2.
> Now I have to modify few case classes due to new requirement I have
> created a savepoint and when I run the app with modified class from the
> savepoint it throws error "state not compatible"
> Previously there were no serializer used.
> I now wish to support state schema Hence need suggestion how can i achieve
> that ?
>
> Regards
>
> On Tue, Feb 25, 2020 at 9:08 PM Khachatryan Roman <
> khachatryan.ro...@gmail.com> wrote:
>
>> Hi ApoorvK,
>>
>> I understand that you have a savepoint created by Flink 1.6.2 and you
>> want to use it with Flink 1.8.2. The classes themselves weren't modified.
>> Is that correct?
>> Which serializer did you use?
>>
>> Regards,
>> Roman
>>
>>
>> On Tue, Feb 25, 2020 at 8:38 AM ApoorvK 
>> wrote:
>>
>>> Hi Team,
>>>
>>> Earlier we have developed on flink 1.6.2 , So there are lots of case
>>> classes
>>> which have Map,Nested case class within them for example below :
>>>
>>> case class MyCaseClass(var a: Boolean,
>>>  var b: Boolean,
>>>  var c: Boolean,
>>>  var d: NestedCaseClass,
>>>  var e:Int){
>>> def this(){this(false,false,new NestedCaseClass,0)}
>>> }
>>>
>>>
>>> Now we have migrated to flink 1.8.2 , I need help to figure out how can I
>>> achieve state schema evolution for such classes.
>>>
>>> 1. Is creating avro for these classes now, and implement avro
>>> serialisation
>>> will that work ?
>>> 2. Or if I register kyroserialiser with protobuf serialiser at env?
>>>
>>> Please suggest what can be done here, or redirect for the avros
>>> serialisation example.
>>>
>>> Thanks
>>>
>>>
>>>
>>>
>>> --
>>> Sent from:
>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>>>
>>


Re: Timeseries aggregation with many IoT devices off of one Kafka topic.

2020-02-25 Thread Khachatryan Roman
Hi,

I think conceptually the pipeline could look something like this:
env
  .addSource(...)
  .keyBy("device_id")
  .window(SlidingEventTimeWindows.of(Time.minutes(15), Time.seconds(10)))
  .trigger(new Trigger {
def onElement(el, timestamp, window, ctx) = {
if (window.start == TimeWindow.getWindowStartWithOffset(timestamp,
0, 10_000)) {
ctx.registerEventTimeTimer(window.end)
}
TriggerResult.CONTINUE
}
def onEventTime(time, window, ctx) = {
TriggerResult.FIRE
}
  }))
  .aggregate(...)

(slide 10s needs to be adjusted)

Regards,
Roman


On Tue, Feb 25, 2020 at 3:44 PM Avinash Tripathy <
avinash.tripa...@stellapps.com> wrote:

> Hi Theo,
>
> We also have the same scenario. If it would be great if you could provide
> some examples or more details about flink process function.
>
> Thanks,
> Avinash
>
> On Tue, Feb 25, 2020 at 12:29 PM theo.diefent...@scoop-software.de <
> theo.diefent...@scoop-software.de> wrote:
>
>> Hi,
>>
>> At last flink forward in Berlin I spoke with some persons about the same
>> problem, where they had construction devices as IoT sensors which could
>> even be offline for multiple days.
>>
>> They told me that the major problem for them was that the watermark in
>> Flink is maintained per operator /subtask, even if you group by key.
>>
>> They solved their problem via a Flink process function where they have
>> full control over state and timers, so you can deal with each device as you
>> like and can e. g. maintain something similar to a per device watermark. I
>> also think that it is the best way to go for this usecase.
>>
>> Best regards
>> Theo
>>
>>
>>
>>
>>  Ursprüngliche Nachricht 
>> Von: hemant singh 
>> Datum: Di., 25. Feb. 2020, 06:19
>> An: Marco Villalobos 
>> Cc: user@flink.apache.org
>> Betreff: Re: Timeseries aggregation with many IoT devices off of one
>> Kafka topic.
>>
>> Hello,
>>
>> I am also working on something similar. Below is the pipeline design I
>> have, sharing may be it can be helpful.
>>
>> topic -> keyed stream on device-id -> window operation -> sink.
>>
>> You can PM me on further details.
>>
>> Thanks,
>> Hemant
>>
>> On Tue, Feb 25, 2020 at 1:54 AM Marco Villalobos 
>> wrote:
>>
>> I need to collect timeseries data from thousands of IoT devices. Each
>> device has name, value, and timestamp published to one Kafka topic.  The
>> event time timestamps are in order only relation with the individual
>> device, but out of order with respect to other devices.
>>
>>
>>
>> Is there a way to aggregate a 15 minute window of this in which each IoT
>> devices gets aggregated with its own event time?
>>
>>
>>
>> I would deeply appreciate if somebody could guide me to an approach for
>> solving this in Flink.
>>
>>
>>
>> I wish there was a group chat for these type of problems.
>>
>>
>>
>>


Re: Map Of DataStream getting NullPointer Exception

2020-02-25 Thread Khachatryan Roman
As I understand from code, streamMap is a Java map, not Scala. So you can
get NPE while unreferencing the value you got from it.

Also, the approach looks a bit strange.
Can you describe what are you trying to achieve?

Regards,
Roman


On Mon, Feb 24, 2020 at 5:47 PM aj  wrote:

>
> I am trying below piece of code to create multiple datastreams object and
> store in map.
>
> for (EventConfig eventConfig : eventTypesList) {
> LOGGER.info("creating a stream for ",
> eventConfig.getEvent_name());
> String key = eventConfig.getEvent_name();
> final StreamingFileSink sink =
> StreamingFileSink.forBulkFormat
> (path,
> ParquetAvroWriters.forGenericRecord(SchemaUtils.getSchema(eventConfig.getSchema_subject(
> .withBucketAssigner(new EventTimeBucketAssigner())
> .build();
>
> DataStream stream =
> dataStream.filter((FilterFunction) genericRecord -> {
> if
> (genericRecord.get(EVENT_NAME).toString().equals(eventConfig.getEvent_name()))
> {
> return true;
> }
> return false;
> });
>
> Tuple2,
> StreamingFileSink> tuple2 = new Tuple2<>(stream, sink);
> streamMap.put(key, tuple2);
> }
>
> DataStream searchStream =
> streamMap.get(SEARCH_LIST_KEYLESS).getField(0);
> searchStream.map(new
> Enricher()).addSink(streamMap.get(SEARCH_LIST_KEYLESS).getField(1));
>
>
> I am getting Nullpointer Exception when trying to get the stream from map
> value at :
>
>
> *DataStream searchStream =
> streamMap.get(SEARCH_LIST_KEYLESS).getField(0);*
>
> As per my understanding, this is due to the map is local to main and not
> broadcast to tasks.
> If I want to do this how should I do, please help me to resolve this?
>
>
>
> --
> Thanks & Regards,
> Anuj Jain
>
>
>
> 
>


Re: Getting javax.management.InstanceAlreadyExistsException when upgraded to 1.10

2020-02-25 Thread Khachatryan Roman
Hi John,

Seems like this is another instance of
https://issues.apache.org/jira/browse/FLINK-8093
Could you please provide the full stacktrace?

Regards,
Roman


On Mon, Feb 24, 2020 at 10:48 PM John Smith  wrote:

> Hi. Just upgraded to 1.10.0 And getting the bellow error when I deploy my
> tasks.
>
> The first 1 seems to deploy ok, but subsequent ones seem to this throw
> this error. But The seem to work still.
>
> javax.management.InstanceAlreadyExistsException:
> kafka.consumer:type=app-info,id=consumer-2
> at com.sun.jmx.mbeanserver.Repository.addMBean(Repository.java:437)
> at
> com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerWithRepository(DefaultMBeanServerInterceptor.java:1898)
> at
> com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerDynamicMBean(DefaultMBeanServerInterceptor.java:966)
> at
> com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerObject(DefaultMBeanServerInterceptor.java:900)
> at
> com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerMBean(DefaultMBeanServerInterceptor.java:324)
> at
> com.sun.jmx.mbeanserver.JmxMBeanServer.registerMBean(JmxMBeanServer.java:522)
> at
> org.apache.kafka.common.utils.AppInfoParser.registerAppInfo(AppInfoParser.java:62)
> at
> org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:805)
> at
> org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:659)
> at
> org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:639)
> at
> org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.getConsumer(KafkaConsumerThread.java:477)
> at
> org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.run(KafkaConsumerThread.java:167)
>


Re: state schema evolution for case classes

2020-02-25 Thread Khachatryan Roman
Hi ApoorvK,

I understand that you have a savepoint created by Flink 1.6.2 and you want
to use it with Flink 1.8.2. The classes themselves weren't modified.
Is that correct?
Which serializer did you use?

Regards,
Roman


On Tue, Feb 25, 2020 at 8:38 AM ApoorvK 
wrote:

> Hi Team,
>
> Earlier we have developed on flink 1.6.2 , So there are lots of case
> classes
> which have Map,Nested case class within them for example below :
>
> case class MyCaseClass(var a: Boolean,
>  var b: Boolean,
>  var c: Boolean,
>  var d: NestedCaseClass,
>  var e:Int){
> def this(){this(false,false,new NestedCaseClass,0)}
> }
>
>
> Now we have migrated to flink 1.8.2 , I need help to figure out how can I
> achieve state schema evolution for such classes.
>
> 1. Is creating avro for these classes now, and implement avro serialisation
> will that work ?
> 2. Or if I register kyroserialiser with protobuf serialiser at env?
>
> Please suggest what can be done here, or redirect for the avros
> serialisation example.
>
> Thanks
>
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


Re: How to determine average utilization before backpressure kicks in?

2020-02-25 Thread Khachatryan Roman
Hi Morgan,

Thanks for your reply.

I think the only possible way to determine this limit is load testing. In
the end, this is all load testing is about.
I can only suggest testing parts of the system separately to know their
individual limits (e.g. IO, CPU). Ideally, this should be done on a regular
basis.

Hope this helps.

Regards,
Roman


On Tue, Feb 25, 2020 at 2:47 PM Morgan Geldenhuys <
morgan.geldenh...@tu-berlin.de> wrote:

> Hi Roman,
>
> Thank you for the reply.
>
> Yes, I am aware that backpressure can be the result of many factors and
> yes this is an oversimplification of something very complex, please bare
> with me. Lets assume that this has been taken into account and has lowered
> the threshold for when this status permanently comes into effect, i.e. HIGH.
>
> Example: The system is running along perfectly fine under normal
> conditions, accessing external sources, and processing at an average of
> 100,000 messages/sec. Lets assume the maximum capacity is around 130,000
> message/sec before back pressure starts propagating messages back up the
> stream. Therefore, utilization is at 0.76 (100K/130K). Great, but at
> present we dont know that 130,000 is the limit.
>
> For this example or for any job, is there a way of finding this maximum
> capacity (and hence the utilization) without pushing the system to its
> limit based on the current throughput? Possibly by measuring (as you say)
> the saturation of certain buffers (looking into this now, however, i am not
> too familiar with flink internals)? It doesn't have to be extremely
> precise. Any hints would be greatly appreciated.
>
> Regards,
> M.
>
> On 25.02.20 13:34, Khachatryan Roman wrote:
>
> Hi Morgan,
>
> Regarding backpressure, it can be caused by a number of factors, e.g.
> writing to an external system or slow input partitions.
>
> However, if you know that a particular resource is a bottleneck then it
> makes sense to monitor its saturation.
> It can be done by using Flink metrics. Please see the documentation for
> more details:
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/monitoring/metrics.html
>
> Regards,
> Roman
>
>
> On Tue, Feb 25, 2020 at 12:33 PM Morgan Geldenhuys <
> morgan.geldenh...@tu-berlin.de> wrote:
>
>> Hello community,
>>
>> I am fairly new to Flink and have a question concerning utilization. I
>> was hoping someone could help.
>>
>> Knowing that backpressure is essentially the point at which utilization
>> has reached 100% for any particular streaming pipeline and means that
>> the application cannot "keep up" with the messages coming into the system.
>>
>> I was wondering, assuming a fairly stable input throughput, is there a
>> way of determining the average utilization as a percentage? Can we
>> determine how much more capacity each operator has before backpressure
>> kicks in from metrics alone, i.e. 60% of capacity for example? Knowing
>> that the maximum throughput of the DSP application is dictated by the
>> slowest part of the pipeline, we would need to identify the slowest
>> operator and then average horizontally.
>>
>> The only method that I can see of determining the point at which the
>> system cannot keep up any longer is by scaling the input throughput
>> slowly until the backpressure HIGH alarm is shown and hence the number
>> of messages/sec is known.
>>
>> Yes I know this is a gross oversimplification and there are many many
>> factors that need to be taken into account when dealing with
>> backpressure, but it would be nice to have a general indicator, a rough
>> estimate is fine.
>>
>> Thank you in advance.
>>
>> Regards,
>> M.
>>
>>
>>
>>
>


Re: How to determine average utilization before backpressure kicks in?

2020-02-25 Thread Khachatryan Roman
Hi Morgan,

Regarding backpressure, it can be caused by a number of factors, e.g.
writing to an external system or slow input partitions.

However, if you know that a particular resource is a bottleneck then it
makes sense to monitor its saturation.
It can be done by using Flink metrics. Please see the documentation for
more details:
https://ci.apache.org/projects/flink/flink-docs-release-1.10/monitoring/metrics.html

Regards,
Roman


On Tue, Feb 25, 2020 at 12:33 PM Morgan Geldenhuys <
morgan.geldenh...@tu-berlin.de> wrote:

> Hello community,
>
> I am fairly new to Flink and have a question concerning utilization. I
> was hoping someone could help.
>
> Knowing that backpressure is essentially the point at which utilization
> has reached 100% for any particular streaming pipeline and means that
> the application cannot "keep up" with the messages coming into the system.
>
> I was wondering, assuming a fairly stable input throughput, is there a
> way of determining the average utilization as a percentage? Can we
> determine how much more capacity each operator has before backpressure
> kicks in from metrics alone, i.e. 60% of capacity for example? Knowing
> that the maximum throughput of the DSP application is dictated by the
> slowest part of the pipeline, we would need to identify the slowest
> operator and then average horizontally.
>
> The only method that I can see of determining the point at which the
> system cannot keep up any longer is by scaling the input throughput
> slowly until the backpressure HIGH alarm is shown and hence the number
> of messages/sec is known.
>
> Yes I know this is a gross oversimplification and there are many many
> factors that need to be taken into account when dealing with
> backpressure, but it would be nice to have a general indicator, a rough
> estimate is fine.
>
> Thank you in advance.
>
> Regards,
> M.
>
>
>
>


Re: Exactly once semantics for hdfs sink

2020-02-12 Thread Khachatryan Roman
Hi Vishwas,

Please let me know if you have any specific questions about the
StreamingFile sink.

Regards,
Roman


On Wed, Feb 12, 2020 at 4:45 AM Zhijiang  wrote:

> Hi Vishwas,
>
> I guess this link [1] can help understand how it works and how to use in
> practice for StreamingFileSink.
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/connectors/streamfile_sink.html
>
> Best,
> Zhijiang
>
> --
> From:Vishwas Siravara 
> Send Time:2020 Feb. 12 (Wed.) 05:19
> To:Khachatryan Roman 
> Cc:user 
> Subject:Re: Exactly once semantics for hdfs sink
>
> Hi Khachatryan,
> Thanks for your reply. Can you help me understand how it works with hdfs
> specifically , even a link to a document will help.
>
>
> Best,
> Vishwas
>
> On Mon, Feb 10, 2020 at 10:32 AM Khachatryan Roman <
> khachatryan.ro...@gmail.com> wrote:
> Hi Vishwas,
>
> Yes, Streaming File Sink does support exactly-once semantics and can be
> used with HDFS.
>
> Regards,
> Roman
>
>
> On Mon, Feb 10, 2020 at 5:20 PM Vishwas Siravara 
> wrote:
> Hi all,
> I want to use the StreamingFile sink for writing data to hdfs. Can I
> achieve exactly once semantics with this sink ?
>
>
> Best,
> HW.
>
>
>


Re: Flink Minimal requirements

2020-02-10 Thread Khachatryan Roman
Hi Kristof,

Flink doesn't have any specific requirements.
You can run Flink on a single node with just one core. The number of
threads is dynamic.

However, you'll probably want to configure memory usage if the default
values are greater than what the actual machine has.

Regards,
Roman


On Mon, Feb 10, 2020 at 9:02 AM KristoffSC 
wrote:

> Hi all,
> well this may be a little bit strange question, but are there any minimal
> machine requirements (memory size, CPU etc) and  non functional
> requirements
> (number of nodes, network ports ports, etc) for Flink?
>
> I know it all boils down to what my deployed Job will be, but if we just
> could put this aside for a moment and focus on a bare minimum just for
> Flink.
>
> Probably we can say that Flink requires minim 2 nodes right?
> What about minimal memory needed for Flink runtime. How many threads
> Flink's
> runtime is using.
>
> Any thought about this one?
>
> Thanks,
> Krzysztof
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


Re: Exactly once semantics for hdfs sink

2020-02-10 Thread Khachatryan Roman
Hi Vishwas,

Yes, Streaming File Sink does support exactly-once semantics and can be
used with HDFS.

Regards,
Roman


On Mon, Feb 10, 2020 at 5:20 PM Vishwas Siravara  wrote:

> Hi all,
> I want to use the StreamingFile sink for writing data to hdfs. Can I
> achieve exactly once semantics with this sink ?
>
>
> Best,
> HW.
>


Re: Flink connect hive with hadoop HA

2020-02-10 Thread Khachatryan Roman
Hi,

Could you please provide a full stacktrace?

Regards,
Roman


On Mon, Feb 10, 2020 at 2:12 PM sunfulin  wrote:

> Hi, guys
> I am using Flink 1.10 and test functional cases with hive intergration.
> Hive with 1.1.0-cdh5.3.0 and with hadoop HA enabled.Running flink job I can
> see successful connection with hive metastore, but cannot read table data
> with exception:
>
> java.lang.IllegalArgumentException: java.net.UnknownHostException:
> nameservice1
> at
> org.apache.hadoop.security.SecurityUtil.buildTokenService(SecurityUtil.java:374)
> at
> org.apache.hadoop.hdfs.NameNodeProxies.createNonHAProxy(NameNodeProxies.java:310)
> at
> org.apache.hadoop.hdfs.NameNodeProxies.createProxy(NameNodeProxies.java:176)
> at org.apache.hadoop.hdfs.DFSClient.(DFSClient.java:668)
> at org.apache.hadoop.hdfs.DFSClient.(DFSClient.java:604)
> at
> org.apache.hadoop.hdfs.DistributedFileSystem.initialize(DistributedFileSystem.java:148)
> at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2598)
> at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:91)
>
> I am running a standalone application. Looks like I am missing my hadoop
> conf file in my flink job application classpath. Where should I config ?
>
>
>
>


Re: [Help] Anyone know where I can find performance test result?

2020-02-10 Thread Khachatryan Roman
+ user@flink.apache.org (re-adding)

If you have a PR and would like to check the performance you can reach
Flink committers to see the results at http://codespeed.dak8s.net:8000/

This UI uses https://github.com/tobami/codespeed
So you can also set it up in your environment.

Regards,
Roman


On Mon, Feb 10, 2020 at 10:10 AM 闫旭  wrote:

> Yes, it’s the one
>
> On Feb 10, 2020, at 5:08 PM, Khachatryan Roman <
> khachatryan.ro...@gmail.com> wrote:
>
> Hi Xu Yan,
>
> Do you mean flink-benchmarks repo?
>
> Regards,
> Roman
>
>
> On Mon, Feb 10, 2020 at 4:18 AM 闫旭  wrote:
>
>> Hi there,
>>
>> I am just exploring the apache flink git repo and found the performance
>> test. I have already test on my local machine, I’m wondering if we got
>> online result?
>>
>> Thanks
>>
>> Regards
>>
>> Xu Yan
>>
>
>


Re: [Help] Anyone know where I can find performance test result?

2020-02-10 Thread Khachatryan Roman
Hi Xu Yan,

Do you mean flink-benchmarks repo?

Regards,
Roman


On Mon, Feb 10, 2020 at 4:18 AM 闫旭  wrote:

> Hi there,
>
> I am just exploring the apache flink git repo and found the performance
> test. I have already test on my local machine, I’m wondering if we got
> online result?
>
> Thanks
>
> Regards
>
> Xu Yan
>


  1   2   >