Re: Pre shuffle aggregation in flink is not working

2021-08-19 Thread suman shil
Hi Jing,
I tried using `*MapBundleOperator*` also (I am yet to test with
LinkedHashMap) . But I am always seeing that the following code of `
*AbstractMapBundleOperator.java*`  `*numOfElements` *is always 0. It is
never getting incremented. I replaced `*TaxiFareStream*` with `
*MapBundleOperator*` in the above code. It should increment by 1 each time
an element is processed but that is not happening.
















*public void processElement(StreamRecord element) throws Exception
{// get the key and value for the map bundlefinal IN input
= element.getValue();final K bundleKey = getKey(input);
final V bundleValue = bundle.get(bundleKey);// get a new value
after adding this element to bundlefinal V newBundleValue =
function.addInput(bundleValue, input);// update to map bundle
  bundle.put(bundleKey, newBundleValue);numOfElements++;
bundleTrigger.onElement(input);}*

One more question, you mentioned that I need to test with `*LinkedHashMap*`
instead of `*HashMap*`. Where should I make this change? Do I need to
create a class which extends from `MapBundleOperator` and add it there?

Thanks


On Thu, Aug 19, 2021 at 9:58 PM JING ZHANG  wrote:

> Hi Suman,
> Please try copy `*MapBundleOperator*`, update the `HashMap` to
> `LinkedHashMap` to keep the output sequence consistent with input sequence.
>
> Best,
> JING ZHANG
>
> suman shil  于2021年8月20日周五 上午2:23写道:
>
>> Hi Jing,
>> Thanks for looking into this. Here is the code of `TaxiFareStream'. I was
>> following this link
>> http://felipeogutierrez.blogspot.com/2019/04/implementing-my-own-stream-operator-in.html
>> . Please let me know if there is any other way of aggregating elements
>> locally.
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> *public class TaxiFareStream extends MapBundleOperator> TaxiFare, TaxiFare> {private KeySelector keySelector;
>>   public TaxiFareStream(MapBundleFunction> TaxiFare> userFunction,  BundleTrigger
>> bundleTrigger,  KeySelector
>> keySelector) {super(userFunction, bundleTrigger, keySelector);
>>   this.keySelector = keySelector;}@Overrideprotected Long
>> getKey(TaxiFare input) throws Exception {return
>> keySelector.getKey(input);}}*
>>
>> Thanks
>>
>> On Thu, Aug 19, 2021 at 9:23 AM JING ZHANG  wrote:
>>
>>> Hi Suman,
>>> Would you please provide the code about `*TaxiFareStream*`? It seems we
>>> could use `MapBundleOperator` directly here.
>>> BTW, I have some concerns about using the solution to do
>>> local-aggregation for window aggregation because `MapBundleOperator`
>>> would save input data in a bundle which is a HashMap object which could
>>> not keep the data input sequence. I'm afraid there exists
>>> unorder in a bundle (in your case 10) problem. I'm not sure whether it
>>> is reasonable to assign a watermark based on an unordered
>>> timestamp.
>>>
>>> Best,
>>> JING ZHANG
>>>
>>>
>>>
>>> suman shil  于2021年8月19日周四 下午12:43写道:
>>>
 I am trying to do pre shuffle aggregation in flink. Following is the
 MapBundle implementation.



















 *public class TaxiFareMapBundleFunction extends MapBundleFunction>>> TaxiFare, TaxiFare, TaxiFare> {@Overridepublic TaxiFare
 addInput(@Nullable TaxiFare value, TaxiFare input) throws Exception {
   if (value == null) {return input;}value.tip =
 value.tip + input.tip;return value;}@Overridepublic
 void finishBundle(Map buffer, Collector out)
 throws Exception {for (Map.Entry entry :
 buffer.entrySet()) {out.collect(entry.getValue());}
 }}*

 I am using "CountBundleTrigger.java" . But the pre-shuffle aggregation
 is not working as the "*count*" variable is always 0. Please let me
 know If I am missing something.








 *@Overridepublic void onElement(T element) throws Exception {
   count++;if (count >= maxCount) {
 callback.finishBundle();reset();}}*

 Here is the main code.


























 *MapBundleFunction
 mapBundleFunction = new TaxiFareMapBundleFunction();
 BundleTrigger bundleTrigger = new CountBundleTrigger<>(10);
   KeySelector taxiFareLongKeySelector = new
 KeySelector() {@Overridepublic Long
 getKey(TaxiFare value) throws Exception {return
 value.driverId;}};DataStream>>> Float>> hourlyTips =//fares.keyBy((TaxiFare
 fare) -> fare.driverId)//
 .window(TumblingEventTimeWindows.of(Time.hours(1))).process(new
 AddTips());;fares.transform("preshu

Re: Kafka Metrics

2021-08-19 Thread Mason Chen
FYI, I'm referring to the legacy offsets metric gauges.

On Thu, Aug 19, 2021 at 4:53 PM Mason Chen  wrote:

> Hi all,
>
> We have found that the per partition Kafka metrics contributes to a lot of
> metrics being indexed by our metrics system.
>
> We would still like to have the proxied kafka metrics from the kafka
> clients library. Is there a flag to only exclude Flink's additional Kafka
> metrics?
>
> Best,
> Mason
>
>


submit new job is not working

2021-08-19 Thread Dhiru
hello all ,
       I was able to run sample example and was able to upload jar using UI, 
cluster which I have deployed on k8s 
Today I had to reboot jobmanager after that I am not able to upload any jar to 
my cluster. Do not see any log as well to debug 
any help 


--kumar 

Re: Error while deserializing the element

2021-08-19 Thread JING ZHANG
Hi Vijay, Yun,
I've created a JIRA https://issues.apache.org/jira/browse/FLINK-23886 to
track this.

Best,
JING ZHANG

JING ZHANG  于2021年8月20日周五 下午1:19写道:

> Hi Vijay,
> I have encountered the same problem several times in online production
> Flink jobs, but I have not found the root cause of the exception yet.
> We have walk around the exception by adding the following parameter, hope
> it could help you.
> state.backend.rocksdb.timer-service.factory: HEAP
>
> I would invite Yun Tang who is an expert on the topic to look into the
> problem, we could also create a JIRA to track the issue.
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/ops/state/state_backends/#timers-heap-vs-rocksdb
>
> Best,
> JING ZHANG
>
> vijayakumar palaniappan  于2021年8月19日周四 上午8:02写道:
>
>> Setup Specifics:
>> Version: 1.6.2
>> RocksDB Map State
>> Timers stored in rocksdb
>>
>> When we have this job running for long periods of time like > 30 days, if
>> for some reason the job restarts, we encounter "Error while
>> deserializing the element". Is this a known issue fixed in later
>> versions? I see some changes to code for FLINK-10175, but we don't use
>> any queryable state
>>
>> Below is the stack trace
>>
>> org.apache.flink.util.FlinkRuntimeException: Error while deserializing
>> the element.
>>
>> at
>> org.apache.flink.contrib.streaming.state.RocksDBCachingPriorityQueueSet.deserializeElement(RocksDBCachingPriorityQueueSet.java:389)
>>
>> at
>> org.apache.flink.contrib.streaming.state.RocksDBCachingPriorityQueueSet.peek(RocksDBCachingPriorityQueueSet.java:146)
>>
>> at
>> org.apache.flink.contrib.streaming.state.RocksDBCachingPriorityQueueSet.peek(RocksDBCachingPriorityQueueSet.java:56)
>>
>> at
>> org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue$InternalPriorityQueueComparator.comparePriority(KeyGroupPartitionedPriorityQueue.java:274)
>>
>> at
>> org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue$InternalPriorityQueueComparator.comparePriority(KeyGroupPartitionedPriorityQueue.java:261)
>>
>> at
>> org.apache.flink.runtime.state.heap.HeapPriorityQueue.isElementPriorityLessThen(HeapPriorityQueue.java:164)
>>
>> at
>> org.apache.flink.runtime.state.heap.HeapPriorityQueue.siftUp(HeapPriorityQueue.java:121)
>>
>> at
>> org.apache.flink.runtime.state.heap.HeapPriorityQueue.addInternal(HeapPriorityQueue.java:85)
>>
>> at
>> org.apache.flink.runtime.state.heap.AbstractHeapPriorityQueue.add(AbstractHeapPriorityQueue.java:73)
>>
>> at
>> org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue.(KeyGroupPartitionedPriorityQueue.java:89)
>>
>> at
>> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBPriorityQueueSetFactory.create(RocksDBKeyedStateBackend.java:2792)
>>
>> at
>> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.create(RocksDBKeyedStateBackend.java:450)
>>
>> at
>> org.apache.flink.streaming.api.operators.InternalTimeServiceManager.createTimerPriorityQueue(InternalTimeServiceManager.java:121)
>>
>> at
>> org.apache.flink.streaming.api.operators.InternalTimeServiceManager.registerOrGetTimerService(InternalTimeServiceManager.java:106)
>>
>> at
>> org.apache.flink.streaming.api.operators.InternalTimeServiceManager.getInternalTimerService(InternalTimeServiceManager.java:87)
>>
>> at
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.getInternalTimerService(AbstractStreamOperator.java:764)
>>
>> at
>> org.apache.flink.streaming.api.operators.KeyedProcessOperator.open(KeyedProcessOperator.java:61)
>>
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:424)
>>
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:290)
>>
>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
>>
>> at java.lang.Thread.run(Thread.java:748)
>>
>> Caused by: java.io.EOFException
>>
>> at java.io.DataInputStream.readUnsignedByte(DataInputStream.java:290)
>>
>> at org.apache.flink.types.StringValue.readString(StringValue.java:769)
>>
>> at
>> org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:69)
>>
>> at
>> org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:28)
>>
>> at
>> org.apache.flink.api.java.typeutils.runtime.RowSerializer.deserialize(RowSerializer.java:179)
>>
>> at
>> org.apache.flink.api.java.typeutils.runtime.RowSerializer.deserialize(RowSerializer.java:46)
>>
>> at
>> org.apache.flink.streaming.api.operators.TimerSerializer.deserialize(TimerSerializer.java:168)
>>
>> at
>> org.apache.flink.streaming.api.operators.TimerSerializer.deserialize(TimerSerializer.java:45)
>>
>> at
>> org.apache.flink.contrib.streaming.state.RocksDBCachingPriorityQueueSet.deserializeElement(RocksDBCachingPriorityQueueSet.java:387)
>>
>> ... 20 more
>>
>> --
>> Thanks,
>> -Vijay
>>
>


Re: Error while deserializing the element

2021-08-19 Thread JING ZHANG
Hi Vijay,
I have encountered the same problem several times in online production
Flink jobs, but I have not found the root cause of the exception yet.
We have walk around the exception by adding the following parameter, hope
it could help you.
state.backend.rocksdb.timer-service.factory: HEAP

I would invite Yun Tang who is an expert on the topic to look into the
problem, we could also create a JIRA to track the issue.

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/ops/state/state_backends/#timers-heap-vs-rocksdb

Best,
JING ZHANG

vijayakumar palaniappan  于2021年8月19日周四 上午8:02写道:

> Setup Specifics:
> Version: 1.6.2
> RocksDB Map State
> Timers stored in rocksdb
>
> When we have this job running for long periods of time like > 30 days, if
> for some reason the job restarts, we encounter "Error while deserializing
> the element". Is this a known issue fixed in later versions? I see some
> changes to code for FLINK-10175, but we don't use any queryable state
>
> Below is the stack trace
>
> org.apache.flink.util.FlinkRuntimeException: Error while deserializing the
> element.
>
> at
> org.apache.flink.contrib.streaming.state.RocksDBCachingPriorityQueueSet.deserializeElement(RocksDBCachingPriorityQueueSet.java:389)
>
> at
> org.apache.flink.contrib.streaming.state.RocksDBCachingPriorityQueueSet.peek(RocksDBCachingPriorityQueueSet.java:146)
>
> at
> org.apache.flink.contrib.streaming.state.RocksDBCachingPriorityQueueSet.peek(RocksDBCachingPriorityQueueSet.java:56)
>
> at
> org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue$InternalPriorityQueueComparator.comparePriority(KeyGroupPartitionedPriorityQueue.java:274)
>
> at
> org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue$InternalPriorityQueueComparator.comparePriority(KeyGroupPartitionedPriorityQueue.java:261)
>
> at
> org.apache.flink.runtime.state.heap.HeapPriorityQueue.isElementPriorityLessThen(HeapPriorityQueue.java:164)
>
> at
> org.apache.flink.runtime.state.heap.HeapPriorityQueue.siftUp(HeapPriorityQueue.java:121)
>
> at
> org.apache.flink.runtime.state.heap.HeapPriorityQueue.addInternal(HeapPriorityQueue.java:85)
>
> at
> org.apache.flink.runtime.state.heap.AbstractHeapPriorityQueue.add(AbstractHeapPriorityQueue.java:73)
>
> at
> org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue.(KeyGroupPartitionedPriorityQueue.java:89)
>
> at
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBPriorityQueueSetFactory.create(RocksDBKeyedStateBackend.java:2792)
>
> at
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.create(RocksDBKeyedStateBackend.java:450)
>
> at
> org.apache.flink.streaming.api.operators.InternalTimeServiceManager.createTimerPriorityQueue(InternalTimeServiceManager.java:121)
>
> at
> org.apache.flink.streaming.api.operators.InternalTimeServiceManager.registerOrGetTimerService(InternalTimeServiceManager.java:106)
>
> at
> org.apache.flink.streaming.api.operators.InternalTimeServiceManager.getInternalTimerService(InternalTimeServiceManager.java:87)
>
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.getInternalTimerService(AbstractStreamOperator.java:764)
>
> at
> org.apache.flink.streaming.api.operators.KeyedProcessOperator.open(KeyedProcessOperator.java:61)
>
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:424)
>
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:290)
>
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
>
> at java.lang.Thread.run(Thread.java:748)
>
> Caused by: java.io.EOFException
>
> at java.io.DataInputStream.readUnsignedByte(DataInputStream.java:290)
>
> at org.apache.flink.types.StringValue.readString(StringValue.java:769)
>
> at
> org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:69)
>
> at
> org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:28)
>
> at
> org.apache.flink.api.java.typeutils.runtime.RowSerializer.deserialize(RowSerializer.java:179)
>
> at
> org.apache.flink.api.java.typeutils.runtime.RowSerializer.deserialize(RowSerializer.java:46)
>
> at
> org.apache.flink.streaming.api.operators.TimerSerializer.deserialize(TimerSerializer.java:168)
>
> at
> org.apache.flink.streaming.api.operators.TimerSerializer.deserialize(TimerSerializer.java:45)
>
> at
> org.apache.flink.contrib.streaming.state.RocksDBCachingPriorityQueueSet.deserializeElement(RocksDBCachingPriorityQueueSet.java:387)
>
> ... 20 more
>
> --
> Thanks,
> -Vijay
>


Re: DataStream to Table API

2021-08-19 Thread Caizhi Weng
Hi!

I've created a JIRA ticket[1] for this issue. Please check it out and track
the progress there.

[1] https://issues.apache.org/jira/browse/FLINK-23885

Caizhi Weng  于2021年8月20日周五 上午10:47写道:

> Hi!
>
> This is because TypeExtractor#getMapReturnTypes are not dealing with row
> types (see that method and also TypeExtractor#privateGetForClass). You
> might want to open a JIRA ticket for this.
>
> Matthias Broecheler  于2021年8月20日周五 上午7:01写道:
>
>> Hey Flinkers,
>>
>> I am trying to follow the docs
>> 
>>  to
>> convert a DataStream to a Table. Specifically, I have a DataStream of Row
>> and want the columns of the row to become the columns of the resulting
>> table.
>>
>> That works but only if I construct the Rows statically. If I construct
>> them dynamically (in a map) then Flink turns the entire Row into one column
>> of type "RAW('org.apache.flink.types.Row', '...')".
>>
>> Does anybody know why this is the case or how to fix it? Take a look at
>> the simple Flink program below where I construct the DataStream "rows" in
>> two different ways. I would expect those to be identical (and the sink does
>> print identical information) but the inferred table schema is different.
>>
>> Thanks a ton,
>> Matthias
>>
>> --
>>
>> StreamExecutionEnvironment flinkEnv = 
>> StreamExecutionEnvironment.getExecutionEnvironment();
>> flinkEnv.setRuntimeMode(RuntimeExecutionMode.STREAMING);
>>
>> DataStream integers = flinkEnv.fromElements(12, 5);
>>
>> DataStream rows = integers.map(i -> Row.of("Name"+i, i));
>>
>> //  This alternative way of constructing this data stream produces the 
>> expected table schema
>> //  DataStream rows = flinkEnv.fromElements(Row.of("Name12", 12), 
>> Row.of("Name5", 5));
>>
>> StreamTableEnvironment tableEnv = 
>> StreamTableEnvironment.create(flinkEnv);
>> Table table = tableEnv.fromDataStream(rows);
>> table.printSchema();
>>
>> rows.addSink(new PrintSinkFunction<>());
>>
>> flinkEnv.execute();
>>
>>


Re: Periodic output at end of stream

2021-08-19 Thread JING ZHANG
Hi Matthias,
Thanks for providing the example, I would reply back soon after I do some
debug.

Best,
JING ZHANG

Matthias Broecheler  于2021年8月19日周四 上午1:53写道:

> Hey JING,
>
> thanks for getting back to me. I tried to produce the smallest,
> self-contained example that produces the phenomenon:
> https://gist.github.com/mbroecheler/fd27dd8a810b038ec463cdd5339d290f
>
> If you run MainRepl you should see an infinite loop of re-processing the 5
> integers. The offending process is BufferedLatestSelector - specifically
> the event timer that is registered in it. Without the timer the process
> will not emit an output.
>
> The timer is set whenever the state is null. Is there a problem with how I
> implemented that buffering process?
> Thank you,
> Matthias
>
> On Sun, Aug 15, 2021 at 8:59 PM JING ZHANG  wrote:
>
>> Hi Matthias,
>> How often do you register the event-time timer?
>> It is registered per input record, or re-registered a new timer after an
>> event-time timer is triggered?
>> Would you please provide your test case code, it would be very helpful
>> for troubleshooting.
>>
>> Best wishes,
>> JING ZHANG
>>
>> Matthias Broecheler  于2021年8月14日周六 上午3:44写道:
>>
>>> Hey guys,
>>>
>>> I have a KeyedProcessFunction that gathers statistics on the events that
>>> flow through and emits it periodically (every few seconds) to a SideOutput.
>>> However, at the end of stream the last set of statistics don't get
>>> emitted. I read on the mailing list that processing time timers that are
>>> pending don't get triggered when Flink cleans up a stream, but that event
>>> timers do get triggered because a watermark with Long.MAX_VALUE is sent
>>> through the stream.
>>> Hence, I thought that I could register a "backup" event timer for
>>> Long.MAX_VALUE-1 to make sure that my process function gets notified when
>>> the stream ends to emit the in-flight statistics.
>>>
>>> However, now my simple test case (with a data source fromCollection of 4
>>> elements) keeps iterating over the same 4 elements in an infinite loop.
>>>
>>> I don't know how to make sense of this and would appreciate your help.
>>> Is there a better way to set a timer that gets triggered at the end of
>>> stream?
>>> And for my education: Why does registering an event timer cause an
>>> infinite loop over the source elements?
>>>
>>> Thanks a lot and have a wonderful weekend,
>>> Matthias
>>>
>>


Re: Pre shuffle aggregation in flink is not working

2021-08-19 Thread JING ZHANG
Hi Suman,
Please try copy `*MapBundleOperator*`, update the `HashMap` to
`LinkedHashMap` to keep the output sequence consistent with input sequence.

Best,
JING ZHANG

suman shil  于2021年8月20日周五 上午2:23写道:

> Hi Jing,
> Thanks for looking into this. Here is the code of `TaxiFareStream'. I was
> following this link
> http://felipeogutierrez.blogspot.com/2019/04/implementing-my-own-stream-operator-in.html
> . Please let me know if there is any other way of aggregating elements
> locally.
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> *public class TaxiFareStream extends MapBundleOperator TaxiFare, TaxiFare> {private KeySelector keySelector;
>   public TaxiFareStream(MapBundleFunction TaxiFare> userFunction,  BundleTrigger
> bundleTrigger,  KeySelector
> keySelector) {super(userFunction, bundleTrigger, keySelector);
>   this.keySelector = keySelector;}@Overrideprotected Long
> getKey(TaxiFare input) throws Exception {return
> keySelector.getKey(input);}}*
>
> Thanks
>
> On Thu, Aug 19, 2021 at 9:23 AM JING ZHANG  wrote:
>
>> Hi Suman,
>> Would you please provide the code about `*TaxiFareStream*`? It seems we
>> could use `MapBundleOperator` directly here.
>> BTW, I have some concerns about using the solution to do
>> local-aggregation for window aggregation because `MapBundleOperator`
>> would save input data in a bundle which is a HashMap object which could
>> not keep the data input sequence. I'm afraid there exists
>> unorder in a bundle (in your case 10) problem. I'm not sure whether it is
>> reasonable to assign a watermark based on an unordered
>> timestamp.
>>
>> Best,
>> JING ZHANG
>>
>>
>>
>> suman shil  于2021年8月19日周四 下午12:43写道:
>>
>>> I am trying to do pre shuffle aggregation in flink. Following is the
>>> MapBundle implementation.
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>> *public class TaxiFareMapBundleFunction extends MapBundleFunction>> TaxiFare, TaxiFare, TaxiFare> {@Overridepublic TaxiFare
>>> addInput(@Nullable TaxiFare value, TaxiFare input) throws Exception {
>>>   if (value == null) {return input;}value.tip =
>>> value.tip + input.tip;return value;}@Overridepublic
>>> void finishBundle(Map buffer, Collector out)
>>> throws Exception {for (Map.Entry entry :
>>> buffer.entrySet()) {out.collect(entry.getValue());}
>>> }}*
>>>
>>> I am using "CountBundleTrigger.java" . But the pre-shuffle aggregation
>>> is not working as the "*count*" variable is always 0. Please let me
>>> know If I am missing something.
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>> *@Overridepublic void onElement(T element) throws Exception {
>>> count++;if (count >= maxCount) {
>>> callback.finishBundle();reset();}}*
>>>
>>> Here is the main code.
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>> *MapBundleFunction
>>> mapBundleFunction = new TaxiFareMapBundleFunction();
>>> BundleTrigger bundleTrigger = new CountBundleTrigger<>(10);
>>>   KeySelector taxiFareLongKeySelector = new
>>> KeySelector() {@Overridepublic Long
>>> getKey(TaxiFare value) throws Exception {return
>>> value.driverId;}};DataStream>> Float>> hourlyTips =//fares.keyBy((TaxiFare
>>> fare) -> fare.driverId)//
>>> .window(TumblingEventTimeWindows.of(Time.hours(1))).process(new
>>> AddTips());;fares.transform("preshuffle",
>>> TypeInformation.of(TaxiFare.class),new
>>> TaxiFareStream(mapBundleFunction, bundleTrigger, taxiFareLongKeySelector
>>> )).assignTimestampsAndWatermarks(new
>>> BoundedOutOfOrdernessTimestampExtractor(Time.seconds(20)) {
>>>   @Overridepublic long
>>> extractTimestamp(TaxiFare element) {return
>>> element.startTime.getEpochSecond();}
>>> }).keyBy((TaxiFare fare) ->
>>> fare.driverId)
>>> .window(TumblingProcessingTimeWindows.of(Time.minutes(1)))
>>>   .process(new AddTips());DataStream>
>>> hourlyMax =
>>> hourlyTips.windowAll(TumblingEventTimeWindows.of(Time.hours(1))).maxBy(2);*
>>>
>>> Thanks
>>> Suman
>>>
>>


Re: How can I achieve 'sink.partition-commit.policy.kind'='metastore,success-file' with batch Hive sink?

2021-08-19 Thread Jingsong Li
Hi Yik,

The **batch** Hive sink does not support `sink.partition-commit.policy.kind`.

Default **batch** Hive sink will commit metastore without success-file.

You can create a JIRA for this.

Best,
Jingsong

On Fri, Aug 20, 2021 at 11:01 AM Caizhi Weng  wrote:
>
> Hi!
>
> As far as I know Flink batch jobs will not add the _SUCCESS file. However for 
> batch jobs you can register a JobListener and add the _SUCCESS file by 
> yourself in JobListener#onJobExecuted. See registerJobListener method in 
> StreamExecutionEnvironment.
>
> Yik San Chan  于2021年8月20日周五 上午10:26写道:
>>
>> Hi community,
>>
>> According to the 
>> [docs](https://ci.apache.org/projects/flink/flink-docs-master/docs/connectors/table/filesystem/#partition-commit-policy),
>>  if I create a Hive table with config 
>> sink.partition-commit.policy.kind="metastore,success-file", once the write 
>> to the **streaming** Hive sink is finished:
>>
>> - The HDFS directory will be registered to the Hive metastore,
>> - There will be a _SUCCESS file written to the directory when the job 
>> finishes.
>>
>> An example result directory on HDFS looks like this:
>>
>> [10.106.11.21:serv@cn-hz-wl-prod-data-stat00:~]$ hdfs dfs -ls 
>> /user/hive/warehouse/aiinfra.db/user_loss_predictions/p_day=20210819
>> Found 9 items
>> -rw-r-   2 basedata aiinfra  0 2021-08-20 08:56 
>> /user/hive/warehouse/aiinfra.db/user_loss_predictions/p_day=20210819/_SUCCESS
>> -rw-r-   2 basedata aiinfra   10684668 2021-08-20 08:49 
>> /user/hive/warehouse/aiinfra.db/user_loss_predictions/p_day=20210819/part-3ee91bc0-a5f6-44c9-b2e5-3d50ee882028-0-0
>> -rw-r-   2 basedata aiinfra   10712792 2021-08-20 08:48 
>> /user/hive/warehouse/aiinfra.db/user_loss_predictions/p_day=20210819/part-3ee91bc0-a5f6-44c9-b2e5-3d50ee882028-1-0
>> -rw-r-   2 basedata aiinfra   10759066 2021-08-20 08:46 
>> /user/hive/warehouse/aiinfra.db/user_loss_predictions/p_day=20210819/part-3ee91bc0-a5f6-44c9-b2e5-3d50ee882028-2-0
>> -rw-r-   2 basedata aiinfra   10754886 2021-08-20 08:46 
>> /user/hive/warehouse/aiinfra.db/user_loss_predictions/p_day=20210819/part-3ee91bc0-a5f6-44c9-b2e5-3d50ee882028-3-0
>> -rw-r-   2 basedata aiinfra   10681155 2021-08-20 08:45 
>> /user/hive/warehouse/aiinfra.db/user_loss_predictions/p_day=20210819/part-3ee91bc0-a5f6-44c9-b2e5-3d50ee882028-4-0
>> -rw-r-   2 basedata aiinfra   10725101 2021-08-20 08:46 
>> /user/hive/warehouse/aiinfra.db/user_loss_predictions/p_day=20210819/part-3ee91bc0-a5f6-44c9-b2e5-3d50ee882028-5-0
>> -rw-r-   2 basedata aiinfra   10717976 2021-08-20 08:56 
>> /user/hive/warehouse/aiinfra.db/user_loss_predictions/p_day=20210819/part-3ee91bc0-a5f6-44c9-b2e5-3d50ee882028-6-0
>> -rw-r-   2 basedata aiinfra   10585453 2021-08-20 08:45 
>> /user/hive/warehouse/aiinfra.db/user_loss_predictions/p_day=20210819/part-3ee91bc0-a5f6-44c9-b2e5-3d50ee882028-7-0
>>
>> There are 8 part-* files because I set the flink run parallelism to 8. After 
>> all part-* are written, a _SUCCESS file is added (see the timestamp 08:56, 
>> which is later than all the rest).
>>
>> I wonder: can I do the same with **batch** Hive sink as well? Ideally, after 
>> the job finishes, I would like to have a _SUCCESS file added to the 
>> directory. However, I haven't figured out how to do it yet.
>>
>> Any help? Thanks!
>>
>> Best,
>> Yik San



-- 
Best, Jingsong Lee


Re: How can I achieve 'sink.partition-commit.policy.kind'='metastore,success-file' with batch Hive sink?

2021-08-19 Thread Caizhi Weng
Hi!

As far as I know Flink batch jobs will not add the _SUCCESS file. However
for batch jobs you can register a JobListener and add the _SUCCESS file by
yourself in JobListener#onJobExecuted. See registerJobListener method in
StreamExecutionEnvironment.

Yik San Chan  于2021年8月20日周五 上午10:26写道:

> Hi community,
>
> According to the [docs](
> https://ci.apache.org/projects/flink/flink-docs-master/docs/connectors/table/filesystem/#partition-commit-policy),
> if I create a Hive table with config
> sink.partition-commit.policy.kind="metastore,success-file", once the write
> to the **streaming** Hive sink is finished:
>
> - The HDFS directory will be registered to the Hive metastore,
> - There will be a _SUCCESS file written to the directory when the job
> finishes.
>
> An example result directory on HDFS looks like this:
>
> [10.106.11.21:serv@cn-hz-wl-prod-data-stat00:~]$ hdfs dfs -ls
> /user/hive/warehouse/aiinfra.db/user_loss_predictions/p_day=20210819
> Found 9 items
> -rw-r-   2 basedata aiinfra  0 2021-08-20 08:56
> /user/hive/warehouse/aiinfra.db/user_loss_predictions/p_day=20210819/_SUCCESS
> -rw-r-   2 basedata aiinfra   10684668 2021-08-20 08:49
> /user/hive/warehouse/aiinfra.db/user_loss_predictions/p_day=20210819/part-3ee91bc0-a5f6-44c9-b2e5-3d50ee882028-0-0
> -rw-r-   2 basedata aiinfra   10712792 2021-08-20 08:48
> /user/hive/warehouse/aiinfra.db/user_loss_predictions/p_day=20210819/part-3ee91bc0-a5f6-44c9-b2e5-3d50ee882028-1-0
> -rw-r-   2 basedata aiinfra   10759066 2021-08-20 08:46
> /user/hive/warehouse/aiinfra.db/user_loss_predictions/p_day=20210819/part-3ee91bc0-a5f6-44c9-b2e5-3d50ee882028-2-0
> -rw-r-   2 basedata aiinfra   10754886 2021-08-20 08:46
> /user/hive/warehouse/aiinfra.db/user_loss_predictions/p_day=20210819/part-3ee91bc0-a5f6-44c9-b2e5-3d50ee882028-3-0
> -rw-r-   2 basedata aiinfra   10681155 2021-08-20 08:45
> /user/hive/warehouse/aiinfra.db/user_loss_predictions/p_day=20210819/part-3ee91bc0-a5f6-44c9-b2e5-3d50ee882028-4-0
> -rw-r-   2 basedata aiinfra   10725101 2021-08-20 08:46
> /user/hive/warehouse/aiinfra.db/user_loss_predictions/p_day=20210819/part-3ee91bc0-a5f6-44c9-b2e5-3d50ee882028-5-0
> -rw-r-   2 basedata aiinfra   10717976 2021-08-20 08:56
> /user/hive/warehouse/aiinfra.db/user_loss_predictions/p_day=20210819/part-3ee91bc0-a5f6-44c9-b2e5-3d50ee882028-6-0
> -rw-r-   2 basedata aiinfra   10585453 2021-08-20 08:45
> /user/hive/warehouse/aiinfra.db/user_loss_predictions/p_day=20210819/part-3ee91bc0-a5f6-44c9-b2e5-3d50ee882028-7-0
>
> There are 8 part-* files because I set the flink run parallelism to 8.
> After all part-* are written, a _SUCCESS file is added (see the timestamp
> 08:56, which is later than all the rest).
>
> I wonder: can I do the same with **batch** Hive sink as well? Ideally,
> after the job finishes, I would like to have a _SUCCESS file added to the
> directory. However, I haven't figured out how to do it yet.
>
> Any help? Thanks!
>
> Best,
> Yik San
>


Re: DataStream to Table API

2021-08-19 Thread Caizhi Weng
Hi!

This is because TypeExtractor#getMapReturnTypes are not dealing with row
types (see that method and also TypeExtractor#privateGetForClass). You
might want to open a JIRA ticket for this.

Matthias Broecheler  于2021年8月20日周五 上午7:01写道:

> Hey Flinkers,
>
> I am trying to follow the docs
> 
>  to
> convert a DataStream to a Table. Specifically, I have a DataStream of Row
> and want the columns of the row to become the columns of the resulting
> table.
>
> That works but only if I construct the Rows statically. If I construct
> them dynamically (in a map) then Flink turns the entire Row into one column
> of type "RAW('org.apache.flink.types.Row', '...')".
>
> Does anybody know why this is the case or how to fix it? Take a look at
> the simple Flink program below where I construct the DataStream "rows" in
> two different ways. I would expect those to be identical (and the sink does
> print identical information) but the inferred table schema is different.
>
> Thanks a ton,
> Matthias
>
> --
>
> StreamExecutionEnvironment flinkEnv = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> flinkEnv.setRuntimeMode(RuntimeExecutionMode.STREAMING);
>
> DataStream integers = flinkEnv.fromElements(12, 5);
>
> DataStream rows = integers.map(i -> Row.of("Name"+i, i));
>
> //  This alternative way of constructing this data stream produces the 
> expected table schema
> //  DataStream rows = flinkEnv.fromElements(Row.of("Name12", 12), 
> Row.of("Name5", 5));
>
> StreamTableEnvironment tableEnv = 
> StreamTableEnvironment.create(flinkEnv);
> Table table = tableEnv.fromDataStream(rows);
> table.printSchema();
>
> rows.addSink(new PrintSinkFunction<>());
>
> flinkEnv.execute();
>
>


How can I achieve 'sink.partition-commit.policy.kind'='metastore,success-file' with batch Hive sink?

2021-08-19 Thread Yik San Chan
Hi community,

According to the [docs](
https://ci.apache.org/projects/flink/flink-docs-master/docs/connectors/table/filesystem/#partition-commit-policy),
if I create a Hive table with config
sink.partition-commit.policy.kind="metastore,success-file", once the write
to the **streaming** Hive sink is finished:

- The HDFS directory will be registered to the Hive metastore,
- There will be a _SUCCESS file written to the directory when the job
finishes.

An example result directory on HDFS looks like this:

[10.106.11.21:serv@cn-hz-wl-prod-data-stat00:~]$ hdfs dfs -ls
/user/hive/warehouse/aiinfra.db/user_loss_predictions/p_day=20210819
Found 9 items
-rw-r-   2 basedata aiinfra  0 2021-08-20 08:56
/user/hive/warehouse/aiinfra.db/user_loss_predictions/p_day=20210819/_SUCCESS
-rw-r-   2 basedata aiinfra   10684668 2021-08-20 08:49
/user/hive/warehouse/aiinfra.db/user_loss_predictions/p_day=20210819/part-3ee91bc0-a5f6-44c9-b2e5-3d50ee882028-0-0
-rw-r-   2 basedata aiinfra   10712792 2021-08-20 08:48
/user/hive/warehouse/aiinfra.db/user_loss_predictions/p_day=20210819/part-3ee91bc0-a5f6-44c9-b2e5-3d50ee882028-1-0
-rw-r-   2 basedata aiinfra   10759066 2021-08-20 08:46
/user/hive/warehouse/aiinfra.db/user_loss_predictions/p_day=20210819/part-3ee91bc0-a5f6-44c9-b2e5-3d50ee882028-2-0
-rw-r-   2 basedata aiinfra   10754886 2021-08-20 08:46
/user/hive/warehouse/aiinfra.db/user_loss_predictions/p_day=20210819/part-3ee91bc0-a5f6-44c9-b2e5-3d50ee882028-3-0
-rw-r-   2 basedata aiinfra   10681155 2021-08-20 08:45
/user/hive/warehouse/aiinfra.db/user_loss_predictions/p_day=20210819/part-3ee91bc0-a5f6-44c9-b2e5-3d50ee882028-4-0
-rw-r-   2 basedata aiinfra   10725101 2021-08-20 08:46
/user/hive/warehouse/aiinfra.db/user_loss_predictions/p_day=20210819/part-3ee91bc0-a5f6-44c9-b2e5-3d50ee882028-5-0
-rw-r-   2 basedata aiinfra   10717976 2021-08-20 08:56
/user/hive/warehouse/aiinfra.db/user_loss_predictions/p_day=20210819/part-3ee91bc0-a5f6-44c9-b2e5-3d50ee882028-6-0
-rw-r-   2 basedata aiinfra   10585453 2021-08-20 08:45
/user/hive/warehouse/aiinfra.db/user_loss_predictions/p_day=20210819/part-3ee91bc0-a5f6-44c9-b2e5-3d50ee882028-7-0

There are 8 part-* files because I set the flink run parallelism to 8.
After all part-* are written, a _SUCCESS file is added (see the timestamp
08:56, which is later than all the rest).

I wonder: can I do the same with **batch** Hive sink as well? Ideally,
after the job finishes, I would like to have a _SUCCESS file added to the
directory. However, I haven't figured out how to do it yet.

Any help? Thanks!

Best,
Yik San


Kafka Metrics

2021-08-19 Thread Mason Chen
Hi all,

We have found that the per partition Kafka metrics contributes to a lot of
metrics being indexed by our metrics system.

We would still like to have the proxied kafka metrics from the kafka
clients library. Is there a flag to only exclude Flink's additional Kafka
metrics?

Best,
Mason


Looking for suggestions about multithreaded CEP to be used with flink

2021-08-19 Thread Tejas B
Hi,
Here's our use case :
We are planning to build a rule based engine on top of flink with huge number 
of rules(1000s). the rules could be stateless or stateful. 
Example stateless rule is : A.id = 3 && A.name = 'abc' || A.color = red. 
Example stateful rule is : A is event.id =3, B is event.name = 'abc', C is 
event.color = red and we are looking for pattern AB*C over time window of 1 
hour.

Now we have tried to use flink CEP for this purpose and program crashed because 
of lot of threads. The explanation is : every application of CEP.pattern 
creates a new operator in the graph and flink can't support that many vertices 
in a graph.

Other approach could be to use processFunction in flink, but still to run the 
rules on events stream you'd have to use some sort of CEP or write your own.

My question is, does anybody have any other suggestions on how to achieve this 
? Any other CEPs that integrate and work better with flink (siddhi, jasper, 
drools) ? Any experience would be helpful.


DataStream to Table API

2021-08-19 Thread Matthias Broecheler
Hey Flinkers,

I am trying to follow the docs

to
convert a DataStream to a Table. Specifically, I have a DataStream of Row
and want the columns of the row to become the columns of the resulting
table.

That works but only if I construct the Rows statically. If I construct them
dynamically (in a map) then Flink turns the entire Row into one column of
type "RAW('org.apache.flink.types.Row', '...')".

Does anybody know why this is the case or how to fix it? Take a look at the
simple Flink program below where I construct the DataStream "rows" in two
different ways. I would expect those to be identical (and the sink does
print identical information) but the inferred table schema is different.

Thanks a ton,
Matthias

--

StreamExecutionEnvironment flinkEnv =
StreamExecutionEnvironment.getExecutionEnvironment();
flinkEnv.setRuntimeMode(RuntimeExecutionMode.STREAMING);

DataStream integers = flinkEnv.fromElements(12, 5);

DataStream rows = integers.map(i -> Row.of("Name"+i, i));

//  This alternative way of constructing this data stream produces the
expected table schema
//  DataStream rows = flinkEnv.fromElements(Row.of("Name12",
12), Row.of("Name5", 5));

StreamTableEnvironment tableEnv =
StreamTableEnvironment.create(flinkEnv);
Table table = tableEnv.fromDataStream(rows);
table.printSchema();

rows.addSink(new PrintSinkFunction<>());

flinkEnv.execute();


RE: failures during job start

2021-08-19 Thread Colletta, Edward
Thanks you.   I am going to try the first option for now, but I do need to 
figure out why deployment takes so long.
Are there any metrics or log patterns that would indicate which task is waiting 
and which task is being waited on?


From: Chesnay Schepler 
Sent: Thursday, August 19, 2021 2:23 PM
To: Colletta, Edward ; user@flink.apache.org
Subject: Re: failures during job start

NOTICE: This email is from an external sender - do not click on links or 
attachments unless you recognize the sender and know the content is safe.

This exception means that a task was deployed, but the task that produces the 
data it wants to consume was not available yet (even after waiting for a while).

Your case sounds similar to https://issues.apache.org/jira/browse/FLINK-9413, 
where this happens because the deployment of the producing task takes too long.

You have 2 options to solve this:
a) Have Flink wait longer for the partition to be created by increasing 
taskmanager.network.request-backoff.max
b) Speed up the deployment; for this you'd naturally have to investigate why 
the deployment takes so long in the first place.

On 19/08/2021 07:15, Colletta, Edward wrote:
Any help with this would be appreciated.   Is it possible that this is a 
data/application issue or a flink config/resource issue?

Using flink 11.2, java 11, session cluster, 5 nodes 32 cores each node.

I have an issue where starting a job takes a long time, and sometimes fails 
with PartitionNotFoundException, but succeeds on restart.   The job has 10 
kafka sources (10 partitions for each topic) and parallelism 5.
The failure does not happen when the kafka logs are empty.

Note during below scenario, cpu usage on task manager and job managers is low 
(below 30%)

The scenario we see

  1.  run request to load and run a jar, job appears on dashboard with all 160 
subtasks in Deploying state
  2.  after 2 minutes some subtasks start transitioning to running.
  3.  after another 30 seconds failure occurs and job goes into Restarting state
  4.  after another minute, restart completes all nodes running.

Exception history shows
2021-08-15 07:55:02
org.apache.flink.runtime.io.network.partition.PartitionNotFoundException: 
Partition 205a0867c6ef540009acd962d556f981#0@a6b547c5096f3c33eb9059cfe767a2ec 
not found.
at 
org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.failPartitionRequest(RemoteInputChannel.java:267)
at 
org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.retriggerSubpartitionRequest(RemoteInputChannel.java:166)
at 
org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.retriggerPartitionRequest(SingleInputGate.java:521)
at 
org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.lambda$triggerPartitionStateCheck$1(SingleInputGate.java:765)
at 
java.base/java.util.concurrent.CompletableFuture$UniAccept.tryFire(CompletableFuture.java:714)
at 
java.base/java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:478)
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
at 
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44)
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at 
akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at 
akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)








Re: Process suspend when get Hana connection in open method of sink function

2021-08-19 Thread Chesnay Schepler
If the Hana driver cannot be loaded then the most likely reason is that 
the dependency is not actually on the classpath.

Please double-check that your user jar bundles the dependency.

On 18/08/2021 15:05, Chenzhiyuan(HR) wrote:


Dear all:

I have a problem when I want to sink data to Hana database.

Process is suspended when get Hana connection in the open method of 
sink function as below.


My flink version is 1.10.

public class HrrmPayValueSumToHana extends 
RichSinkFunction  {


@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
connection = HrrmUtils./getHanaConnection/();    // process is 
suspended here

}

@Override public void invoke() {
  …….
}
@Override public void close() throws Exception {
  ……….
}

}

public static Connection getHanaConnection() { Connection con = null; 
try { Class./forName/(HrrmConstants./HANA_DRIVER_CLASS/); con = 
DriverManager./getConnection/(HrrmConstants./HANA_SOURCE_DRIVER_URL/, 
    HrrmConstants./HANA_SOURCE_USER/, 
HrrmConstants./HANA_SOURCE_PASSWORD/);    } catch (Exception e) { /LOG/.error("---hana get connection has exception , msg = ", e); 
    } return con; }


Hana driver dependency as below:

     com.sap.cloud.db.jdbc     
ngdbc     2.3.62 






Re: Pre shuffle aggregation in flink is not working

2021-08-19 Thread suman shil
Hi Jing,
Thanks for looking into this. Here is the code of `TaxiFareStream'. I was
following this link
http://felipeogutierrez.blogspot.com/2019/04/implementing-my-own-stream-operator-in.html
. Please let me know if there is any other way of aggregating elements
locally.














*public class TaxiFareStream extends MapBundleOperator {private KeySelector keySelector;
  public TaxiFareStream(MapBundleFunction userFunction,  BundleTrigger
bundleTrigger,  KeySelector
keySelector) {super(userFunction, bundleTrigger, keySelector);
  this.keySelector = keySelector;}@Overrideprotected Long
getKey(TaxiFare input) throws Exception {return
keySelector.getKey(input);}}*

Thanks

On Thu, Aug 19, 2021 at 9:23 AM JING ZHANG  wrote:

> Hi Suman,
> Would you please provide the code about `*TaxiFareStream*`? It seems we
> could use `MapBundleOperator` directly here.
> BTW, I have some concerns about using the solution to do local-aggregation
> for window aggregation because `MapBundleOperator`
> would save input data in a bundle which is a HashMap object which could
> not keep the data input sequence. I'm afraid there exists
> unorder in a bundle (in your case 10) problem. I'm not sure whether it is
> reasonable to assign a watermark based on an unordered
> timestamp.
>
> Best,
> JING ZHANG
>
>
>
> suman shil  于2021年8月19日周四 下午12:43写道:
>
>> I am trying to do pre shuffle aggregation in flink. Following is the
>> MapBundle implementation.
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> *public class TaxiFareMapBundleFunction extends MapBundleFunction> TaxiFare, TaxiFare, TaxiFare> {@Overridepublic TaxiFare
>> addInput(@Nullable TaxiFare value, TaxiFare input) throws Exception {
>>   if (value == null) {return input;}value.tip =
>> value.tip + input.tip;return value;}@Overridepublic
>> void finishBundle(Map buffer, Collector out)
>> throws Exception {for (Map.Entry entry :
>> buffer.entrySet()) {out.collect(entry.getValue());}
>> }}*
>>
>> I am using "CountBundleTrigger.java" . But the pre-shuffle aggregation is
>> not working as the "*count*" variable is always 0. Please let me know If
>> I am missing something.
>>
>>
>>
>>
>>
>>
>>
>>
>> *@Overridepublic void onElement(T element) throws Exception {
>> count++;if (count >= maxCount) {
>> callback.finishBundle();reset();}}*
>>
>> Here is the main code.
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> *MapBundleFunction
>> mapBundleFunction = new TaxiFareMapBundleFunction();
>> BundleTrigger bundleTrigger = new CountBundleTrigger<>(10);
>>   KeySelector taxiFareLongKeySelector = new
>> KeySelector() {@Overridepublic Long
>> getKey(TaxiFare value) throws Exception {return
>> value.driverId;}};DataStream> Float>> hourlyTips =//fares.keyBy((TaxiFare
>> fare) -> fare.driverId)//
>> .window(TumblingEventTimeWindows.of(Time.hours(1))).process(new
>> AddTips());;fares.transform("preshuffle",
>> TypeInformation.of(TaxiFare.class),new
>> TaxiFareStream(mapBundleFunction, bundleTrigger, taxiFareLongKeySelector
>> )).assignTimestampsAndWatermarks(new
>> BoundedOutOfOrdernessTimestampExtractor(Time.seconds(20)) {
>>   @Overridepublic long
>> extractTimestamp(TaxiFare element) {return
>> element.startTime.getEpochSecond();}
>> }).keyBy((TaxiFare fare) ->
>> fare.driverId)
>> .window(TumblingProcessingTimeWindows.of(Time.minutes(1)))
>>   .process(new AddTips());DataStream>
>> hourlyMax =
>> hourlyTips.windowAll(TumblingEventTimeWindows.of(Time.hours(1))).maxBy(2);*
>>
>> Thanks
>> Suman
>>
>


Re: failures during job start

2021-08-19 Thread Chesnay Schepler
This exception means that a task was deployed, but the task that 
produces the data it wants to consume was not available yet (even after 
waiting for a while).


Your case sounds similar to 
https://issues.apache.org/jira/browse/FLINK-9413, where this happens 
because the deployment of the producing task takes too long.


You have 2 options to solve this:
a) Have Flink wait longer for the partition to be created by increasing 
taskmanager.network.request-backoff.max
b) Speed up the deployment; for this you'd naturally have to investigate 
why the deployment takes so long in the first place.


On 19/08/2021 07:15, Colletta, Edward wrote:


Any help with this would be appreciated.   Is it possible that this is 
a data/application issue or a flink config/resource issue?


Using flink 11.2, java 11, session cluster, 5 nodes 32 cores each node.

I have an issue where starting a job takes a long time, and sometimes 
fails with PartitionNotFoundException, but succeeds on restart.   The 
job has 10 kafka sources (10 partitions for each topic) and parallelism 5.


The failure does not happen when the kafka logs are empty.

Note during below scenario, cpu usage on task manager and job managers 
is low (below 30%)


The scenario we see

  * run request to load and run a jar, job appears on dashboard with
all 160 subtasks in Deploying state
  * after 2 minutes some subtasks start transitioning to running.
  * after another 30 seconds failure occurs and job goes into
Restarting state
  * after another minute, restart completes all nodes running.

Exception history shows

2021-08-15 07:55:02

org.apache.flink.runtime.io.network.partition.PartitionNotFoundException: 
Partition 
205a0867c6ef540009acd962d556f981#0@a6b547c5096f3c33eb9059cfe767a2ec 
not found.


    at 
org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.failPartitionRequest(RemoteInputChannel.java:267)


    at 
org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.retriggerSubpartitionRequest(RemoteInputChannel.java:166)


    at 
org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.retriggerPartitionRequest(SingleInputGate.java:521)


    at 
org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.lambda$triggerPartitionStateCheck$1(SingleInputGate.java:765)


    at 
java.base/java.util.concurrent.CompletableFuture$UniAccept.tryFire(CompletableFuture.java:714)


    at 
java.base/java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:478)


    at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)

    at 
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44)


    at 
akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)


    at 
akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)


    at 
akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)


    at 
akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)






Re: Just failed while starting

2021-08-19 Thread Chesnay Schepler
Can you share the logs with us (ideally on DEBUG if available) from the 
affected TaskManager and JobManager?


On 19/08/2021 08:29, Ivan Yang wrote:

Dear Flink community,

I recently running into this issue at a job startup. It happened from time to 
time. Here is the exception from the job manager:

2021-08-17 01:21:01,944 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph   [] - Source: Defence raw 
event prod05_analytics_output -> String to JSON -> (Sink: Malformed JSON Sink, ThreatSight Filter -> Raw Json 
to ThreatSight Json -> ThreatSight Json to String -> Sink: ThreatSight Sink, Json to CDCA -> (mssp eventlog 
filter 1 -> mssp eventlog filter 2 -> CDCA to eventlog json -> Flat Map, Sink: Parquet Sink Event Time), mssp 
json filter -> action filter -> raw json to action json, Filter -> Json to ResponseAlarm, Filter -> json 
to SecurityEvent) (542/626) (a7be17221c0726a67679091062cfa8dc) switched from DEPLOYING to FAILED on 
172.1.200.173:6122-856ad2 @ ip-172-1-200-173.ec2.internal (dataPort=6121).
org.apache.flink.util.FlinkException: Could not mark slot 
58af05c3109a0fe8f96ea8936c0783a4 active.
at 
org.apache.flink.runtime.taskexecutor.TaskExecutor.lambda$handleAcceptedSlotOffers$18(TaskExecutor.java:1561)
 ~[flink-dist_2.11-1.13.1.jar:1.13.1]
at 
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
 ~[?:1.8.0_302]
at 
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
 ~[?:1.8.0_302]
at 
java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456)
 ~[?:1.8.0_302]
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:440)
 ~[flink-dist_2.11-1.13.1.jar:1.13.1]
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:208)
 ~[flink-dist_2.11-1.13.1.jar:1.13.1]
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158)
 ~[flink-dist_2.11-1.13.1.jar:1.13.1]
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) 
[flink-dist_2.11-1.13.1.jar:1.13.1]
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) 
[flink-dist_2.11-1.13.1.jar:1.13.1]
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) 
[flink-dist_2.11-1.13.1.jar:1.13.1]
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) 
[flink-dist_2.11-1.13.1.jar:1.13.1]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) 
[flink-dist_2.11-1.13.1.jar:1.13.1]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) 
[flink-dist_2.11-1.13.1.jar:1.13.1]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) 
[flink-dist_2.11-1.13.1.jar:1.13.1]
at akka.actor.Actor$class.aroundReceive(Actor.scala:517) 
[flink-dist_2.11-1.13.1.jar:1.13.1]
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) 
[flink-dist_2.11-1.13.1.jar:1.13.1]
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) 
[flink-dist_2.11-1.13.1.jar:1.13.1]
at akka.actor.ActorCell.invoke(ActorCell.scala:561) 
[flink-dist_2.11-1.13.1.jar:1.13.1]
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) 
[flink-dist_2.11-1.13.1.jar:1.13.1]
at akka.dispatch.Mailbox.run(Mailbox.scala:225) 
[flink-dist_2.11-1.13.1.jar:1.13.1]
at akka.dispatch.Mailbox.exec(Mailbox.scala:235) 
[flink-dist_2.11-1.13.1.jar:1.13.1]
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) 
[flink-dist_2.11-1.13.1.jar:1.13.1]
at 
akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) 
[flink-dist_2.11-1.13.1.jar:1.13.1]
at 
akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) 
[flink-dist_2.11-1.13.1.jar:1.13.1]
at 
akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) 
[flink-dist_2.11-1.13.1.jar:1.13.1]
2021-08-17 01:21:01,945 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph   [] - Discarding 
the results produced by task execution a7be17221c0726a67679091062cfa8dc.
2021-08-17 01:21:01,945 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph   [] - Discarding 
the results produced by task execution a7be17221c0726a67679091062cfa8dc.
2021-08-17 01:21:01,945 INFO  
org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy
 [] - Calculating tasks to restart to recover the failed task 
7ac3b50525c642dc419b976cfaf0ee0e_541.
2021-08-17 01:21:01,999 INFO  
org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy
 [] - 3150 tasks should be restarted to recover the failed task 
7ac3b50525c642dc419b976cfaf0ee0e_541.
2021-08-17 01:21:02,012 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph   [] - Job Event 
Router prod05 (0f10bafc07e48918f955fb22

Re: Job manager sometimes doesn't restore job from checkpoint post TaskManager failure

2021-08-19 Thread Chesnay Schepler
How do you deploy Flink on Kubernetes? Do you use the standalone 
 
or native 
 
mode?


Is it really just task managers going down? It seems unlikely that the 
loss of a TM could have such an effect.


Can you provide us with the JobManager logs at the time the TM crash 
occurred? They should contain some hints as to how Flink handled the TM 
failure.



On 19/08/2021 16:06, Kevin Lam wrote:

Hi all,

I've noticed that sometimes when task managers go down--it looks like 
the job is not restored from checkpoint, but instead restarted from a 
fresh state (when I go to the job's checkpoint tab in the UI, I don't 
see the restore, and the number in the job overview all get reset). 
Under what circumstances does this happen?


I've been trying to debug and we really want the job to restore from 
the checkpoint at all times for our use case.


We're running Apache Flink 1.13 on Kubernetes in a high availability 
set-up.


Thanks in advance!





Re: map concurrent modification exception analysis when checkpoint

2021-08-19 Thread Chesnay Schepler
Essentially this exception means that the state was modified while a 
snapshot was being taken.


We usually see this when users hold on to some state value beyond a 
single call to a user-defined function, particularly from different threads.


We may be able to pinpoint the issue if you were to provide us with the 
functions.


On 19/08/2021 16:59, yidan zhao wrote:

Flink web ui shows the exception as follows.
In the task (ual_transform_UserLogBlackUidJudger ->
ual_transform_IpLabel ), the first one is a broadcast process
function, and the second one is an async function. I do not know
whether the issues have some relation to it.

And the issues not occurred before, it occurred after I upgraded to
flink 1.13.2.



_exception info from flink web ui:_
java.io.IOException: Could not perform checkpoint 58 for operator
ual_transform_UserLogBlackUidJudger -> ual_transform_IpLabel
(29/60)#0.

at 
org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:1045)

at 
org.apache.flink.streaming.runtime.io.checkpointing.CheckpointBarrierHandler.notifyCheckpoint(CheckpointBarrierHandler.java:135)

at 
org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.triggerCheckpoint(SingleCheckpointBarrierHandler.java:250)

at 
org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.access$100(SingleCheckpointBarrierHandler.java:61)

at 
org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler$ControllerImpl.triggerGlobalCheckpoint(SingleCheckpointBarrierHandler.java:431)

at 
org.apache.flink.streaming.runtime.io.checkpointing.AbstractAlignedBarrierHandlerState.barrierReceived(AbstractAlignedBarrierHandlerState.java:61)

at 
org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.processBarrier(SingleCheckpointBarrierHandler.java:227)

at 
org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.handleEvent(CheckpointedInputGate.java:180)

at 
org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.pollNext(CheckpointedInputGate.java:158)

at 
org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:110)

at 
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:66)

at 
org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:98)

at 
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:423)

at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:204)

at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:681)

at 
org.apache.flink.streaming.runtime.tasks.StreamTask.executeInvoke(StreamTask.java:636)

at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:647)

at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:620)

at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779)

at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)

at java.lang.Thread.run(Thread.java:748)

Caused by: org.apache.flink.runtime.checkpoint.CheckpointException:
Could not complete snapshot 58 for operator
ual_transform_UserLogBlackUidJudger -> ual_transform_IpLabel
(29/60)#0. Failure reason: Checkpoint was declined.

at 
org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:264)

at 
org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:169)

at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:371)

at 
org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointStreamOperator(SubtaskCheckpointCoordinatorImpl.java:706)

at 
org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.buildOperatorSnapshotFutures(SubtaskCheckpointCoordinatorImpl.java:627)

at 
org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.takeSnapshotSync(SubtaskCheckpointCoordinatorImpl.java:590)

at 
org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointState(SubtaskCheckpointCoordinatorImpl.java:312)

at 
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$8(StreamTask.java:1089)

at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)

at 
org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:1073)

at 
org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:1029)

... 20 more

Caused by: java.util.ConcurrentModificationException

at java.util.HashMap$HashIterator.nextNode(HashMap.java:1445)

at java.util.HashMap$EntryIterator.n

Re: Metrics outside of RichFunction context

2021-08-19 Thread Chesnay Schepler
I don't believe there are other options, outside of creating a ticket to 
have Flink extend the API accordingly.


On 19/08/2021 16:40, John Karp wrote:

Hi,

I'm using StreamingFileSink to collect records into avro files. Inside 
of the BulkWriter implementation, I have to do some operations (such 
as dynamic schema lookup) which I want to record metrics about. 
However, the BulkWriter API, as it is defined, does not accept a 
RuntimeContext or MetricsGroup from the StreamingFileSink, so I 
seemingly don't have access to the metrics API. And it doesn't look 
like StreamingFileSink is particularly extensible. What alternatives 
do I have besides forking StreamingFileSink, or bypassing the Flink 
metrics API entirely?


Thanks,
John





Re: Apache Flink matrics are not alligned in the reporter

2021-08-19 Thread Chesnay Schepler

What reporter interval do you have configured?

On 19/08/2021 13:31, Jawad Tahir wrote:

Hi,

I have defined Graphite as my matrics reporter with my Flink 
(v1.13.2). My pipeline is pretty simple. It consists of one source, 
one stateful operator (simple window aggregation), and one sink 
(operations-playground, basically). I have set the parallel factor as 
2. The graph of the pipeline is as follows:


[Flink pipeline][1]

The program is running well and producing the correct results. 
However, when I see the matrics, I see that source started sending the 
records way after the system has started even though my sink was 
producing correct results since the start of the job. Here is the 
[graph][2] of uptime of the job and numRecordsOut of the source. As 
far as I understood, Apache Flink sources' numRecordsOut should start 
with uptime as my sink was producing correct results since the start.



  [1]: https://i.stack.imgur.com/rZm5h.png 

  [2]: https://i.stack.imgur.com/nlBoS.png 






Re: Pre shuffle aggregation in flink is not working

2021-08-19 Thread JING ZHANG
Hi Suman,
Would you please provide the code about `*TaxiFareStream*`? It seems we
could use `MapBundleOperator` directly here.
BTW, I have some concerns about using the solution to do local-aggregation
for window aggregation because `MapBundleOperator`
would save input data in a bundle which is a HashMap object which could not
keep the data input sequence. I'm afraid there exists
unorder in a bundle (in your case 10) problem. I'm not sure whether it is
reasonable to assign a watermark based on an unordered
timestamp.

Best,
JING ZHANG



suman shil  于2021年8月19日周四 下午12:43写道:

> I am trying to do pre shuffle aggregation in flink. Following is the
> MapBundle implementation.
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> *public class TaxiFareMapBundleFunction extends MapBundleFunction TaxiFare, TaxiFare, TaxiFare> {@Overridepublic TaxiFare
> addInput(@Nullable TaxiFare value, TaxiFare input) throws Exception {
>   if (value == null) {return input;}value.tip =
> value.tip + input.tip;return value;}@Overridepublic
> void finishBundle(Map buffer, Collector out)
> throws Exception {for (Map.Entry entry :
> buffer.entrySet()) {out.collect(entry.getValue());}
> }}*
>
> I am using "CountBundleTrigger.java" . But the pre-shuffle aggregation is
> not working as the "*count*" variable is always 0. Please let me know If
> I am missing something.
>
>
>
>
>
>
>
>
> *@Overridepublic void onElement(T element) throws Exception {
>   count++;if (count >= maxCount) {
> callback.finishBundle();reset();}}*
>
> Here is the main code.
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> *MapBundleFunction
> mapBundleFunction = new TaxiFareMapBundleFunction();
> BundleTrigger bundleTrigger = new CountBundleTrigger<>(10);
>   KeySelector taxiFareLongKeySelector = new
> KeySelector() {@Overridepublic Long
> getKey(TaxiFare value) throws Exception {return
> value.driverId;}};DataStream Float>> hourlyTips =//fares.keyBy((TaxiFare
> fare) -> fare.driverId)//
> .window(TumblingEventTimeWindows.of(Time.hours(1))).process(new
> AddTips());;fares.transform("preshuffle",
> TypeInformation.of(TaxiFare.class),new
> TaxiFareStream(mapBundleFunction, bundleTrigger, taxiFareLongKeySelector
> )).assignTimestampsAndWatermarks(new
> BoundedOutOfOrdernessTimestampExtractor(Time.seconds(20)) {
>   @Overridepublic long
> extractTimestamp(TaxiFare element) {return
> element.startTime.getEpochSecond();}
> }).keyBy((TaxiFare fare) ->
> fare.driverId)
> .window(TumblingProcessingTimeWindows.of(Time.minutes(1)))
>   .process(new AddTips());DataStream>
> hourlyMax =
> hourlyTips.windowAll(TumblingEventTimeWindows.of(Time.hours(1))).maxBy(2);*
>
> Thanks
> Suman
>


map concurrent modification exception analysis when checkpoint

2021-08-19 Thread yidan zhao
Flink web ui shows the exception as follows.
In the task (ual_transform_UserLogBlackUidJudger ->
ual_transform_IpLabel ), the first one is a broadcast process
function, and the second one is an async function. I do not know
whether the issues have some relation to it.

And the issues not occurred before, it occurred after I upgraded to
flink 1.13.2.



_exception info from flink web ui:_
java.io.IOException: Could not perform checkpoint 58 for operator
ual_transform_UserLogBlackUidJudger -> ual_transform_IpLabel
(29/60)#0.

at 
org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:1045)

at 
org.apache.flink.streaming.runtime.io.checkpointing.CheckpointBarrierHandler.notifyCheckpoint(CheckpointBarrierHandler.java:135)

at 
org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.triggerCheckpoint(SingleCheckpointBarrierHandler.java:250)

at 
org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.access$100(SingleCheckpointBarrierHandler.java:61)

at 
org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler$ControllerImpl.triggerGlobalCheckpoint(SingleCheckpointBarrierHandler.java:431)

at 
org.apache.flink.streaming.runtime.io.checkpointing.AbstractAlignedBarrierHandlerState.barrierReceived(AbstractAlignedBarrierHandlerState.java:61)

at 
org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.processBarrier(SingleCheckpointBarrierHandler.java:227)

at 
org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.handleEvent(CheckpointedInputGate.java:180)

at 
org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.pollNext(CheckpointedInputGate.java:158)

at 
org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:110)

at 
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:66)

at 
org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:98)

at 
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:423)

at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:204)

at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:681)

at 
org.apache.flink.streaming.runtime.tasks.StreamTask.executeInvoke(StreamTask.java:636)

at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:647)

at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:620)

at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779)

at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)

at java.lang.Thread.run(Thread.java:748)

Caused by: org.apache.flink.runtime.checkpoint.CheckpointException:
Could not complete snapshot 58 for operator
ual_transform_UserLogBlackUidJudger -> ual_transform_IpLabel
(29/60)#0. Failure reason: Checkpoint was declined.

at 
org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:264)

at 
org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:169)

at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:371)

at 
org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointStreamOperator(SubtaskCheckpointCoordinatorImpl.java:706)

at 
org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.buildOperatorSnapshotFutures(SubtaskCheckpointCoordinatorImpl.java:627)

at 
org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.takeSnapshotSync(SubtaskCheckpointCoordinatorImpl.java:590)

at 
org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointState(SubtaskCheckpointCoordinatorImpl.java:312)

at 
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$8(StreamTask.java:1089)

at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)

at 
org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:1073)

at 
org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:1029)

... 20 more

Caused by: java.util.ConcurrentModificationException

at java.util.HashMap$HashIterator.nextNode(HashMap.java:1445)

at java.util.HashMap$EntryIterator.next(HashMap.java:1479)

at java.util.HashMap$EntryIterator.next(HashMap.java:1477)

at 
com.esotericsoftware.kryo.serializers.MapSerializer.copy(MapSerializer.java:156)

at 
com.esotericsoftware.kryo.serializers.MapSerializer.copy(MapSerializer.java:21)

at com.esotericsoftware.kryo.Kryo.copy(Kryo.java:862)

at 
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSeria

Metrics outside of RichFunction context

2021-08-19 Thread John Karp
Hi,

I'm using StreamingFileSink to collect records into avro files. Inside of
the BulkWriter implementation, I have to do some operations (such as
dynamic schema lookup) which I want to record metrics about. However, the
BulkWriter API, as it is defined, does not accept a RuntimeContext or
MetricsGroup from the StreamingFileSink, so I seemingly don't have access
to the metrics API. And it doesn't look like StreamingFileSink is
particularly extensible. What alternatives do I have besides forking
StreamingFileSink, or bypassing the Flink metrics API entirely?

Thanks,
John


Job manager sometimes doesn't restore job from checkpoint post TaskManager failure

2021-08-19 Thread Kevin Lam
Hi all,

I've noticed that sometimes when task managers go down--it looks like the
job is not restored from checkpoint, but instead restarted from a fresh
state (when I go to the job's checkpoint tab in the UI, I don't see the
restore, and the number in the job overview all get reset). Under what
circumstances does this happen?

I've been trying to debug and we really want the job to restore from the
checkpoint at all times for our use case.

We're running Apache Flink 1.13 on Kubernetes in a high availability
set-up.

Thanks in advance!


Re: How can I build the flink docker image from source code?

2021-08-19 Thread Caizhi Weng
Hi!

If you only modified Java code, use mvn clean package to build Flink from
source code. After that COPY all jars in
flink-dist/target/flink-/lib to the lib directory of the latest
Flink image.

Chenyu Zheng  于2021年8月19日周四 下午7:36写道:

> Hi contributors,
>
>
>
> I’ve changed a little bit code in flink, and want to build a docker image
> to test it. Could you tell me how can I build the image from source code?
>
>
>
> Thx!
>


How can I build the flink docker image from source code?

2021-08-19 Thread Chenyu Zheng
Hi contributors,

I’ve changed a little bit code in flink, and want to build a docker image to 
test it. Could you tell me how can I build the image from source code?

Thx!


Apache Flink matrics are not alligned in the reporter

2021-08-19 Thread Jawad Tahir
Hi,

I have defined Graphite as my matrics reporter with my Flink (v1.13.2). My
pipeline is pretty simple. It consists of one source, one stateful operator
(simple window aggregation), and one sink (operations-playground,
basically). I have set the parallel factor as 2. The graph of the pipeline
is as follows:

[Flink pipeline][1]

The program is running well and producing the correct results. However,
when I see the matrics, I see that source started sending the records way
after the system has started even though my sink was producing correct
results since the start of the job. Here is the [graph][2] of uptime of the
job and numRecordsOut of the source. As far as I understood, Apache Flink
sources' numRecordsOut should start with uptime as my sink was producing
correct results since the start.


  [1]: https://i.stack.imgur.com/rZm5h.png
  [2]: https://i.stack.imgur.com/nlBoS.png


Re: Theory question on process_continously processing mode and watermarks

2021-08-19 Thread Arvid Heise
I think what you are seeing is that the files have records with similar
timestamps. That means after reading file1 your watermarks are already
progressed to the end of your time range. When Flink picks up file2, all
records are considered late records and no windows fire anymore.

See [1] for a possible soluton on DataStream. Table API is dealing much
better with that if you use upserts [2].

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/datastream/operators/windows/#allowed-lateness
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/concepts/versioned_tables/

On Thu, Aug 19, 2021 at 11:48 AM Caizhi Weng  wrote:

> Hi!
>
> FileProcessingMode.PROCESS_CONTINUOUSLY means to continuously scans the
> file for updates, and there should be nothing to do with stopping the
> streaming job.
>
> I'm suspecting that in the column you defined the watermark there is some
> data which exceeds Long.MAX_VALUE. A Long.MAX_VALUE watermark indicates the
> job to stop. You might also want to share your code in the mailing lists so
> others can look into this problem more deeply.
>
> Fra  于2021年8月19日周四 下午5:26写道:
>
>> Hello, during my personal development of a Flink streaming Platform i
>> found something that perplexes me.
>>
>> Using FileProcessingMode.*PROCESS_CONTINUOUSLY*
>>
>> Into a streaming job that uses tumbling Windows and watermarks causes my
>> streaming process to stop ad the reading files phase.
>>
>> Meanwhile if i delete my declarations of Windows and watermark the
>> program works as expected.
>>
>> Is there some meaning behind this behaviour ? my theory is that
>> PROCESS_CONTINOUSLY re-reads the file and that causes a contradiction with
>> the watermarks created in the first reading of the files, causing it to stop
>>
>>
>>
>>
>>
>> Inviato da Posta  per
>> Windows
>>
>>
>>
>


Re: Theory question on process_continously processing mode and watermarks

2021-08-19 Thread Caizhi Weng
Hi!

FileProcessingMode.PROCESS_CONTINUOUSLY means to continuously scans the
file for updates, and there should be nothing to do with stopping the
streaming job.

I'm suspecting that in the column you defined the watermark there is some
data which exceeds Long.MAX_VALUE. A Long.MAX_VALUE watermark indicates the
job to stop. You might also want to share your code in the mailing lists so
others can look into this problem more deeply.

Fra  于2021年8月19日周四 下午5:26写道:

> Hello, during my personal development of a Flink streaming Platform i
> found something that perplexes me.
>
> Using FileProcessingMode.*PROCESS_CONTINUOUSLY*
>
> Into a streaming job that uses tumbling Windows and watermarks causes my
> streaming process to stop ad the reading files phase.
>
> Meanwhile if i delete my declarations of Windows and watermark the program
> works as expected.
>
> Is there some meaning behind this behaviour ? my theory is that
> PROCESS_CONTINOUSLY re-reads the file and that causes a contradiction with
> the watermarks created in the first reading of the files, causing it to stop
>
>
>
>
>
> Inviato da Posta  per
> Windows
>
>
>


Re: Timer Service vs Custom Triggers

2021-08-19 Thread Caizhi Weng
Hi!

If you'd like to aggregate something on the records before time out, then
you want to consider using session window (instead of writing your own
trigger). However if aggregation is not needed I would prefer using a
process function to process watermark by myself, as the registered timer in
the window are stored in memory for each record, and if you have a very
large data flow this might cause a shortage in heap memory.

Aeden Jameson  于2021年8月19日周四 上午8:07写道:

> My use case is that I'm producing a set of measurements by key every
> 60 seconds. Currently,  this is handled with the usual pattern of
> keyBy().window(Tumbling...(60)).process(...) I need to provide the same
> output, but as a result of a timeout. The data needed for the timeout
> summary will be in the global state for that key. This seems possible by
> either using the timer service in the process function without a window
> (e.g. keyBy(..).process(..)) or by using a customer trigger. Why choose one
> or the other?
>
> --
> Thanks,
> Aeden
>
> GitHub: https://github.com/aedenj
> Linked In: http://www.linkedin.com/in/aedenjameson
>
>


Theory question on process_continously processing mode and watermarks

2021-08-19 Thread Fra
Hello, during my personal development of a Flink streaming Platform i found something that perplexes me.Using FileProcessingMode.PROCESS_CONTINUOUSLYInto a streaming job that uses tumbling Windows and watermarks causes my streaming process to stop ad the reading files phase.Meanwhile if i delete my declarations of Windows and watermark the program works as expected.Is there some meaning behind this behaviour ? my theory is that PROCESS_CONTINOUSLY re-reads the file and that causes a contradiction with the watermarks created in the first reading of the files, causing it to stop  Inviato da Posta per Windows 


Re: Setting S3 parameters in a K8 jobmanager deployment

2021-08-19 Thread Yang Wang
I am afraid jobmanager.sh[1] could not parse the "-D" correctly now.

[1].
https://github.com/apache/flink/blob/master/flink-dist/src/main/flink-bin/bin/jobmanager.sh


Best,
Yang

Robert Cullen  于2021年8月18日周三 下午10:21写道:

> I have a kubernetes jobmanager deployment that requires parameters be
> passed as command line rather than retrieving values from the flink-config
> map. Is there a way to do this?
>
> apiVersion: apps/v1
> kind: Deployment
> metadata:
>   name: flink-jobmanager
> spec:
>   replicas: 1 # Set the value to greater than 1 to start standby JobManagers
>   selector:
> matchLabels:
>   app: flink
>   component: jobmanager
>   template:
> metadata:
>   labels:
> app: flink
> component: jobmanager
> spec:
>   containers:
>   - name: jobmanager
> image: apache/flink:1.13.0-scala_2.11
> args: ["jobmanager", "-Ds3.endpoint=https://192.173.0.0:9000";, 
> "-Ds3.access-key=key", "Ds3.secret-key=secret"]
> ports
>
> Robert Cullen
> 240-475-4490
>