Re: NullPointerException in StateTable.put()

2021-08-16 Thread László Ciople
Ok, thank you for the tips. I will modify it and get back to you :)

On Tue, Aug 17, 2021 at 9:42 AM David Morávek  wrote:

> Hi Laszlo,
>
> Please use reply-all for mailing list replies. This may help others
> finding their answer in the future ;)
>
>
>> sb.append(DeviceDetail.class.getName()).append('@').append(Integer.toHexString(System.identityHashCode(this))).append('[');
>
>
> This part will again make your key non-deterministic, because you're using
> a memory address inside the content for hashing. I don't see any other
> problem in the snippet you've sent.
>
> Best,
> D.
>
> On Tue, Aug 17, 2021 at 8:33 AM László Ciople 
> wrote:
>
>> I modified the code to use a sha256 hash instead of the hashCode when the
>> id is not present in the object. The same behaviour was manifested still.
>> Here is the code that selects the key:
>> @Override
>> public String getKey(AzureADIamEvent value) throws Exception {
>> // key is the device id or the hash of the device properties
>> String key = value.payload.properties.deviceDetail.deviceId;
>>
>> if (key == null || key.equals("")) {
>> LOG.warn("Device id is null or empty, using sha256 value");
>> key = DigestUtils.sha256Hex(value.payload.properties.
>> deviceDetail.toString());
>> }
>>
>> return key;
>> }
>>
>> And the definition of the class the key is created from:
>> public class DeviceDetail {
>> @JsonProperty("browser")
>> public String browser;
>> @JsonProperty("deviceId")
>> public String deviceId;
>> @JsonProperty("displayName")
>> public String displayName;
>> @JsonProperty("operatingSystem")
>> public String operatingSystem;
>> @JsonProperty("trustType")
>> public String trustType;
>> @Override
>> public String toString() {
>> StringBuilder sb = new StringBuilder();
>> sb.append(DeviceDetail.class.getName()).append('@').append(
>> Integer.toHexString(System.identityHashCode(this))).append('[');
>> sb.append("browser");
>> sb.append('=');
>> sb.append(((this.browser == null)?"":this.browser));
>> sb.append(',');
>> sb.append("deviceId");
>> sb.append('=');
>> sb.append(((this.deviceId == null)?"":this.deviceId));
>> sb.append(',');
>> sb.append("displayName");
>> sb.append('=');
>> sb.append(((this.displayName == null)?"":this.displayName
>> ));
>> sb.append(',');
>> sb.append("operatingSystem");
>> sb.append('=');
>> sb.append(((this.operatingSystem == null)?"":this.
>> operatingSystem));
>> sb.append(',');
>> sb.append("trustType");
>> sb.append('=');
>> sb.append(((this.trustType == null)?"":this.trustType));
>> sb.append(',');
>> if (sb.charAt((sb.length()- 1)) == ',') {
>> sb.setCharAt((sb.length()- 1), ']');
>> } else {
>> sb.append(']');
>> }
>> return sb.toString();
>> }
>> }
>>
>>


Re: RabbitMQ 3.9+ Native Streams

2021-08-16 Thread David Morávek
This would be awesome! We have the contribution guide
 [1] that
should give you a rough idea on how to approach the contribution. Let me
know if you need any further guidance, I'd be happy to help ;)

[1] https://flink.apache.org/contributing/how-to-contribute.html

Best,
D.

On Tue, Aug 17, 2021 at 1:17 AM Rob Englander 
wrote:

> I will definitely consider the contribution idea :)
>
>
> On Mon, Aug 16, 2021 at 3:16 PM David Morávek  wrote:
>
>> Hi Rob,
>>
>> there is currently no on-going effort for this topic, I think this would
>> be a really great contribution though. This seems to be pushing RabbitMQ
>> towards new usages ;)
>>
>> Best,
>> D.
>>
>> On Mon, Aug 16, 2021 at 8:16 PM Rob Englander 
>> wrote:
>>
>>> I'm wondering if there's any work underway to develop
>>> DataSource/DataSink for RabbitMQ's streams recently released in RMQ 3.9?
>>>
>>> Rob Englander
>>>
>>


Re: NullPointerException in StateTable.put()

2021-08-16 Thread David Morávek
Hi Laszlo,

Please use reply-all for mailing list replies. This may help others finding
their answer in the future ;)


> sb.append(DeviceDetail.class.getName()).append('@').append(Integer.toHexString(System.identityHashCode(this))).append('[');


This part will again make your key non-deterministic, because you're using
a memory address inside the content for hashing. I don't see any other
problem in the snippet you've sent.

Best,
D.

On Tue, Aug 17, 2021 at 8:33 AM László Ciople 
wrote:

> I modified the code to use a sha256 hash instead of the hashCode when the
> id is not present in the object. The same behaviour was manifested still.
> Here is the code that selects the key:
> @Override
> public String getKey(AzureADIamEvent value) throws Exception {
> // key is the device id or the hash of the device properties
> String key = value.payload.properties.deviceDetail.deviceId;
>
> if (key == null || key.equals("")) {
> LOG.warn("Device id is null or empty, using sha256 value");
> key = DigestUtils.sha256Hex(value.payload.properties.
> deviceDetail.toString());
> }
>
> return key;
> }
>
> And the definition of the class the key is created from:
> public class DeviceDetail {
> @JsonProperty("browser")
> public String browser;
> @JsonProperty("deviceId")
> public String deviceId;
> @JsonProperty("displayName")
> public String displayName;
> @JsonProperty("operatingSystem")
> public String operatingSystem;
> @JsonProperty("trustType")
> public String trustType;
> @Override
> public String toString() {
> StringBuilder sb = new StringBuilder();
> sb.append(DeviceDetail.class.getName()).append('@').append(Integer
> .toHexString(System.identityHashCode(this))).append('[');
> sb.append("browser");
> sb.append('=');
> sb.append(((this.browser == null)?"":this.browser));
> sb.append(',');
> sb.append("deviceId");
> sb.append('=');
> sb.append(((this.deviceId == null)?"":this.deviceId));
> sb.append(',');
> sb.append("displayName");
> sb.append('=');
> sb.append(((this.displayName == null)?"":this.displayName));
> sb.append(',');
> sb.append("operatingSystem");
> sb.append('=');
> sb.append(((this.operatingSystem == null)?"":this.
> operatingSystem));
> sb.append(',');
> sb.append("trustType");
> sb.append('=');
> sb.append(((this.trustType == null)?"":this.trustType));
> sb.append(',');
> if (sb.charAt((sb.length()- 1)) == ',') {
> sb.setCharAt((sb.length()- 1), ']');
> } else {
> sb.append(']');
> }
> return sb.toString();
> }
> }
>
>


Re: Can i contribute for flink doc ?

2021-08-16 Thread Caizhi Weng
Hi!

Thanks for your interest in contributing to Flink. Currently most of the
committers are busy with the upcoming Flink 1.14 so there might be few
people having their eyes on the new PRs, especially if they do not exist in
a JIRA issue.

Please follow Jing Zhang's advice by first creating the corresponding JIRA
tickets and relate your PRs to them. After that ping me (@tsreaper) in your
PR and I'll go and take a look.

JING ZHANG  于2021年8月17日周二 上午11:53写道:

> Hi Camile,
> First of all, thanks for the great contribution, the document improvement
> is very helpful.
>
> but I don't know why nobody merges it and no comment.
>>
> Maybe we could try the following way to start the first contribution,
> please go document [1] for more detailed information.
> 1. Please make sure there exists a Flink’s Jira
>  issue that corresponds to
> your contribution. We require all documentation changes to refer to a Jira
> issue, except for trivial fixes such as typos.
> 2. have a look at the Documentation Style Guide
>  for some guidance
> on how to write accessible, consistent and inclusive documentation.
> 3.  Attach your pull request with new created Flink JIRA, the commit
> message should point to the corresponding Jira issue by starting with
> [FLINK-].
> Then, there would be one or more committers to review the pull request and
> merge it finally.
>
> [1] https://flink.apache.org/contributing/contribute-documentation.html
>
> Best,
> JING ZHANG
>
> Camile Sing  于2021年8月17日周二 上午10:57写道:
>
>> Hi, all
>> I'm a Flink user. recently I find some problems when I use Flink, it
>> takes some time to understand the internal mechanisms. This really makes me
>> know more about Flink, but I think the doc can be clearer, so I open some
>> merge requests for the doc:
>> - https://github.com/apache/flink/pull/16823
>> - https://github.com/apache/flink/pull/16683
>>
>> but I don't know why nobody merges it and no comment.
>>
>


Re: Handling HTTP Requests for Keyed Streams

2021-08-16 Thread Caizhi Weng
Hi!

As you mentioned that the configuration fetching is very infrequent, why
don't you use a blocking approach to send HTTP requests and receive
responses? This seems like a more reasonable solution to me.

Rion Williams  于2021年8月17日周二 上午4:00写道:

> Hi all,
>
> I've been exploring a few different options for storing tenant-specific
> configurations within Flink state based on the messages I have flowing
> through my job. Initially I had considered creating a source that would
> periodically poll an HTTP API and connect that stream to my original event
> stream.
>
> However, I realized that this configuration information would basically
> never change and thus it doesn't quite make sense to poll so frequently. My
> next approach would be to have a function that would be keyed (by tenant)
> and storing the configuration for that tenant in state (and issue an HTTP
> call when I did not have it). Something like this:
>
> class ConfigurationLookupFunction: KeyedProcessFunction JsonObject>() {
> // Tenant specific configuration
> private lateinit var httpClient: HttpClient
> private lateinit var configuration: ValueState
>
> override fun open(parameters: Configuration) {
> super.open(parameters)
> httpClient = HttpClient.newHttpClient()
> }
>
> override fun processElement(message: JsonObject, context: Context, out: 
> Collector) {
> if (configuration.value() == null){
> // Issue a request to the appropriate API to load the 
> configuration
> val url = 
> HttpRequest.newBuilder(URI.create(".../${context.currentKey}")).build()
> httpClient.send(..., {
> // Store the configuration info within state here
> configuration.update(...)
> })
>
> out.collect(message)
> }
> else {
> // Get the configuration information and pass it downstream to be 
> used by the sink
> out.collect(message)
> }
> }
> }
>
> I didn't see any support for using the Async I/O functions from a keyed
> context, otherwise I'd imagine that would be ideal. The requests themselves
> should be very infrequent (initial call per tenant) and I'd imagine after
> that the necessary configuration could be pulled/stored within the state
> for that key.
>
> Is there a good way of handling this that I might be overlooking with an
> existing Flink construct or function? I'd love to be able to leverage the
> Async I/O connectors as they seem pretty well thought out.
>
> Thanks in advance!
>
> Rion
>
>
>


Re: Can i contribute for flink doc ?

2021-08-16 Thread JING ZHANG
Hi Camile,
First of all, thanks for the great contribution, the document improvement
is very helpful.

but I don't know why nobody merges it and no comment.
>
Maybe we could try the following way to start the first contribution,
please go document [1] for more detailed information.
1. Please make sure there exists a Flink’s Jira
 issue that corresponds to your
contribution. We require all documentation changes to refer to a Jira
issue, except for trivial fixes such as typos.
2. have a look at the Documentation Style Guide
 for some guidance
on how to write accessible, consistent and inclusive documentation.
3.  Attach your pull request with new created Flink JIRA, the commit
message should point to the corresponding Jira issue by starting with
[FLINK-].
Then, there would be one or more committers to review the pull request and
merge it finally.

[1] https://flink.apache.org/contributing/contribute-documentation.html

Best,
JING ZHANG

Camile Sing  于2021年8月17日周二 上午10:57写道:

> Hi, all
> I'm a Flink user. recently I find some problems when I use Flink, it
> takes some time to understand the internal mechanisms. This really makes me
> know more about Flink, but I think the doc can be clearer, so I open some
> merge requests for the doc:
> - https://github.com/apache/flink/pull/16823
> - https://github.com/apache/flink/pull/16683
>
> but I don't know why nobody merges it and no comment.
>


Re: Windowed Aggregation With Event Time over a Temporary View

2021-08-16 Thread JING ZHANG
Hi Joe,
>
> caused by: org.apache.calcite.runtime.CalciteContextException: From line
> 1, column 139 to line 1, column 181: Cannot apply '$TUMBLE' to arguments of
> type '$TUMBLE(, )'. Supported form(s):
> '$TUMBLE(, )'

The first parameter of Group Window Function [1] must be a field with time
attribute [2].
In you case, the first parameter of TUMBLE Window Function is `max_event_time`,
which is not a field with time attribute.
It originates from max(event_time) as max_event_time from the first
unbounded aggregate, please note that
the aggregate function result could not be a field with time attribute even
if it works on a time attribute (in your case,
`max` function works on event_time which is not a field with time attribute
either).

Is there a way to make this work?

First, we need define the time attribute when converting a DataStream To
Table[3], please refer document [3] for detailed information.

Secondly, we need propagate the time attribute to next sql part.
Unbounded aggregate could not propagate the time attribute. You could try
Window aggregate to propagate the time attribute,
For example, TUMBLE_ROWTIME, HOP_ROWTIME, SESSION_ROWTIME could return a
field with rowtime attribute [4].

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/sql/queries.html#group-windows
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.12/concepts/timely-stream-processing.html#notions-of-time-event-time-and-processing-time

[3]
https://ci.apache.org/projects/flink/flink-docs-master/docs/dev/table/concepts/time_attributes/#during-datastream-to-table-conversion
[4]
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/sql/queries.html#selecting-group-window-start-and-end-timestamps

Best,
JING ZHANG

Joseph Lorenzini  于2021年8月16日周一 下午9:45写道:

> Hi all,
>
>
>
> I am on Flink 1.12.3.
>
>
>
> So here’s the scenario: I have a Kafka topic as a source, where each
> record repsents a change to an append only audit log. The kafka record has
> the following fields:
>
>
>
>- id (unique identifier for that audit log entry)
>- operation id (is shared across multiple records)
>- operation (string)
>- start_ts (TIMESTAMP(3))
>- end_ts (TIMESTAMP(3))
>
>
>
> I am trying to calculate the average item count and duration per
> operation. I first converted the kafka source to an append only data stream
> and then I attempted to run the following SQL:
>
>
>
>   Table workLogTable = tableEnv.fromDataStream(workLogStream)
>
>   tableEnv.createTemporaryView("work_log", workLogTable);
>
>  Table workLogCntTable = tableEnv.sqlQuery("select operation_id,
> operation, max(start_ts) as start_ts, max(end_ts) as end_ts, count(*) as
> item_count, max(audit_ts) as audit_ts, max(event_time) as max_event_time" +
>
> " FROM work_log GROUP BY operation_id, operation");
>
> tableEnv.createTemporaryView("work_log_cnt", workLogCntTable);
>
> tableEnv.executeSql("select max(audit_ts), operation,
> avg(item_count) as average_item_count, AVG(end_ts - start_ts) as
> avg_duration from" +
>
> " work_log_cnt" +
>
> " GROUP BY TUMBLE(max_event_time, INTERVAL '1' SECOND),
> operation").print();
>
>
>
> The problem I am having is that I am unable to preserve the event time
> between the first view and the second. I am getting this error:
>
>
>
> caused by: org.apache.calcite.runtime.CalciteContextException: From line
> 1, column 139 to line 1, column 181: Cannot apply '$TUMBLE' to arguments of
> type '$TUMBLE(, )'. Supported form(s):
> '$TUMBLE(, )'
>
>
>
> My guess is that the max function in the first query is converting the
> event time from DATETIME type to a BigInt. I am not sure how to apply an
> aggregate to the event time in the first query such that the event time
> from the original kafka stream can be used in the second view. Is there a
> way to make this work?
>
>
>
> Thanks,
>
> Joe
>
>
> Privileged/Confidential Information may be contained in this message. If
> you are not the addressee indicated in this message (or responsible for
> delivery of the message to such person), you may not copy or deliver this
> message to anyone. In such case, you should destroy this message and kindly
> notify the sender by reply email. Please advise immediately if you or your
> employer does not consent to Internet email for messages of this kind.
> Opinions, conclusions and other information in this message that do not
> relate to the official business of my firm shall be understood as neither
> given nor endorsed by it.
>


Can i contribute for flink doc ?

2021-08-16 Thread Camile Sing
Hi, all
I'm a Flink user. recently I find some problems when I use Flink, it
takes some time to understand the internal mechanisms. This really makes me
know more about Flink, but I think the doc can be clearer, so I open some
merge requests for the doc:
- https://github.com/apache/flink/pull/16823
- https://github.com/apache/flink/pull/16683

but I don't know why nobody merges it and no comment.


Re: redis sink from flink

2021-08-16 Thread Yik San Chan
By the way, this post in Chinese showed how we do it exactly with code.

https://yiksanchan.com/posts/flink-bulk-insert-redis

And yes it had buffered writes support by leveraging Flink operator state,
and Redis Pipelining. Feel free to let you know if you have any questions.


On Tue, Aug 17, 2021 at 10:15 AM Yik San Chan 
wrote:

> Hi Jin,
>
> I was in the same shoes. I tried bahir redis connector at first, then I
> felt it was very limited, so I rolled out my own. It was actually quite
> straightforward.
>
> All you need to do is to extend RichSinkFunction, then put your logic
> inside. Regarding Redis clients, Jedis (https://github.com/redis/jedis)
> is quite popular and simple to get started.
>
> Let me know if you love to learn more details about our implementation.
>
> Best,
> Yik San
>
> On Tue, Aug 17, 2021 at 9:15 AM Jin Yi  wrote:
>
>> is apache bahir still a thing?  it hasn't been touched for months (since
>> redis 2.8.5).
>>
>> as such, looking at the current flink connector docs, it's no longer
>> pointing to anything from the bahir project.  looking around in either the
>> flink or bahir newsgroups doesn't turn up anything regarding bahir's EOL.
>>
>> is the best bet for a flink to redis sink something i roll on my own
>> (inclined to go this route w/ buffered writes)?  or should i try going
>> through via kafka and using confluent's kafka redis connector (flink =>
>> kafka => redis)?
>>
>


Re: redis sink from flink

2021-08-16 Thread Yik San Chan
Hi Jin,

I was in the same shoes. I tried bahir redis connector at first, then I
felt it was very limited, so I rolled out my own. It was actually quite
straightforward.

All you need to do is to extend RichSinkFunction, then put your logic
inside. Regarding Redis clients, Jedis (https://github.com/redis/jedis) is
quite popular and simple to get started.

Let me know if you love to learn more details about our implementation.

Best,
Yik San

On Tue, Aug 17, 2021 at 9:15 AM Jin Yi  wrote:

> is apache bahir still a thing?  it hasn't been touched for months (since
> redis 2.8.5).
>
> as such, looking at the current flink connector docs, it's no longer
> pointing to anything from the bahir project.  looking around in either the
> flink or bahir newsgroups doesn't turn up anything regarding bahir's EOL.
>
> is the best bet for a flink to redis sink something i roll on my own
> (inclined to go this route w/ buffered writes)?  or should i try going
> through via kafka and using confluent's kafka redis connector (flink =>
> kafka => redis)?
>


Re: redis sink from flink

2021-08-16 Thread Yangze Guo
Hi, Jin

IIUC, the DataStream connector `RedisSink` can still be used. However,
the Table API connector `RedisTableSink` might not work (at least in
the future) because it is implemented based on the deprecated Table
connector abstraction. You can still give it a try, though.

Best,
Yangze Guo

On Tue, Aug 17, 2021 at 9:15 AM Jin Yi  wrote:
>
> is apache bahir still a thing?  it hasn't been touched for months (since 
> redis 2.8.5).
>
> as such, looking at the current flink connector docs, it's no longer pointing 
> to anything from the bahir project.  looking around in either the flink or 
> bahir newsgroups doesn't turn up anything regarding bahir's EOL.
>
> is the best bet for a flink to redis sink something i roll on my own 
> (inclined to go this route w/ buffered writes)?  or should i try going 
> through via kafka and using confluent's kafka redis connector (flink => kafka 
> => redis)?


Re: Flink taskmanager in crash loop

2021-08-16 Thread Yangze Guo
Hi, Abhishek,

Do you see something like "Fatal error occurred while executing the
TaskManager" in your log or would you like to provide the whole task
manager log?

Best,
Yangze Guo

On Tue, Aug 17, 2021 at 5:17 AM Abhishek Rai  wrote:
>
> Hello,
>
> In our production environment, running Flink 1.13 (Scala 2.11), where Flink 
> has been working without issues with a dozen or so jobs running for a while, 
> Flink taskmanager started crash looping with a period of ~4 minutes per 
> crash.  The stack trace is not very informative, therefore reaching out for 
> help, see below.
>
> The only other thing that's unusual is that due to what might be a product 
> issue (custom job code running on Flink), some or all of our tasks are also 
> in a crash loop.  Still, I wasn't expecting taskmanager itself to die.  Does 
> taskmanager have some built in feature to crash if all/most tasks are 
> crashing?
>
> 2021-08-16 15:58:23.984 [main] ERROR 
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner  - Terminating 
> TaskManagerRunner with exit code 1.
> org.apache.flink.util.FlinkException: Unexpected failure during runtime of 
> TaskManagerRunner.
>   at 
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner.runTaskManager(TaskManagerRunner.java:382)
>   at 
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner.lambda$runTaskManagerProcessSecurely$3(TaskManagerRunner.java:413)
>   at java.base/java.security.AccessController.doPrivileged(Native Method)
>   at java.base/javax.security.auth.Subject.doAs(Unknown Source)
>   at 
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1682)
>   at 
> org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
>   at 
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner.runTaskManagerProcessSecurely(TaskManagerRunner.java:413)
>   at 
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner.runTaskManagerProcessSecurely(TaskManagerRunner.java:396)
>   at 
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner.main(TaskManagerRunner.java:354)
> Caused by: java.util.concurrent.TimeoutException: null
>   at 
> org.apache.flink.runtime.concurrent.FutureUtils$Timeout.run(FutureUtils.java:1255)
>   at 
> org.apache.flink.runtime.concurrent.DirectExecutorService.execute(DirectExecutorService.java:217)
>   at 
> org.apache.flink.runtime.concurrent.FutureUtils.lambda$orTimeout$15(FutureUtils.java:582)
>   at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Unknown 
> Source)
>   at java.base/java.util.concurrent.FutureTask.run(Unknown Source)
>   at 
> java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown
>  Source)
>   at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown 
> Source)
>   at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown 
> Source)
>   at java.base/java.lang.Thread.run(Unknown Source)
> 2021-08-16 15:58:23.986 [TaskExecutorLocalStateStoresManager shutdown hook] 
> INFO  o.a.flink.runtime.state.TaskExecutorLocalStateStoresManager  - Shutting 
> down TaskExecutorLocalStateStoresManager.
>
>
> Thanks very much!
>
> Abhishek


redis sink from flink

2021-08-16 Thread Jin Yi
is apache bahir still a thing?  it hasn't been touched for months (since
redis 2.8.5).

as such, looking at the current flink connector docs, it's no longer
pointing to anything from the bahir project.  looking around in either the
flink or bahir newsgroups doesn't turn up anything regarding bahir's EOL.

is the best bet for a flink to redis sink something i roll on my own
(inclined to go this route w/ buffered writes)?  or should i try going
through via kafka and using confluent's kafka redis connector (flink =>
kafka => redis)?


Re: RabbitMQ 3.9+ Native Streams

2021-08-16 Thread Rob Englander
I will definitely consider the contribution idea :)


On Mon, Aug 16, 2021 at 3:16 PM David Morávek  wrote:

> Hi Rob,
>
> there is currently no on-going effort for this topic, I think this would
> be a really great contribution though. This seems to be pushing RabbitMQ
> towards new usages ;)
>
> Best,
> D.
>
> On Mon, Aug 16, 2021 at 8:16 PM Rob Englander 
> wrote:
>
>> I'm wondering if there's any work underway to develop DataSource/DataSink
>> for RabbitMQ's streams recently released in RMQ 3.9?
>>
>> Rob Englander
>>
>


Flink taskmanager in crash loop

2021-08-16 Thread Abhishek Rai
Hello,

In our production environment, running Flink 1.13 (Scala 2.11), where Flink
has been working without issues with a dozen or so jobs running for a
while, Flink taskmanager started crash looping with a period of ~4 minutes
per crash.  The stack trace is not very informative, therefore reaching out
for help, see below.

The only other thing that's unusual is that due to what might be a product
issue (custom job code running on Flink), some or all of our tasks are also
in a crash loop.  Still, I wasn't expecting taskmanager itself to die.
Does taskmanager have some built in feature to crash if all/most tasks are
crashing?

2021-08-16 15:58:23.984 [main] ERROR
org.apache.flink.runtime.taskexecutor.TaskManagerRunner  - Terminating
TaskManagerRunner with exit code 1.
org.apache.flink.util.FlinkException: Unexpected failure during
runtime of TaskManagerRunner.
  at 
org.apache.flink.runtime.taskexecutor.TaskManagerRunner.runTaskManager(TaskManagerRunner.java:382)
  at 
org.apache.flink.runtime.taskexecutor.TaskManagerRunner.lambda$runTaskManagerProcessSecurely$3(TaskManagerRunner.java:413)
  at java.base/java.security.AccessController.doPrivileged(Native Method)
  at java.base/javax.security.auth.Subject.doAs(Unknown Source)
  at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1682)
  at 
org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
  at 
org.apache.flink.runtime.taskexecutor.TaskManagerRunner.runTaskManagerProcessSecurely(TaskManagerRunner.java:413)
  at 
org.apache.flink.runtime.taskexecutor.TaskManagerRunner.runTaskManagerProcessSecurely(TaskManagerRunner.java:396)
  at 
org.apache.flink.runtime.taskexecutor.TaskManagerRunner.main(TaskManagerRunner.java:354)
Caused by: java.util.concurrent.TimeoutException: null
  at 
org.apache.flink.runtime.concurrent.FutureUtils$Timeout.run(FutureUtils.java:1255)
  at 
org.apache.flink.runtime.concurrent.DirectExecutorService.execute(DirectExecutorService.java:217)
  at 
org.apache.flink.runtime.concurrent.FutureUtils.lambda$orTimeout$15(FutureUtils.java:582)
  at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Unknown
Source)
  at java.base/java.util.concurrent.FutureTask.run(Unknown Source)
  at 
java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown
Source)
  at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
  at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown
Source)
  at java.base/java.lang.Thread.run(Unknown Source)
2021-08-16 15:58:23.986 [TaskExecutorLocalStateStoresManager shutdown
hook] INFO  o.a.flink.runtime.state.TaskExecutorLocalStateStoresManager
 - Shutting down TaskExecutorLocalStateStoresManager.


Thanks very much!

Abhishek


Handling HTTP Requests for Keyed Streams

2021-08-16 Thread Rion Williams
Hi all,

I've been exploring a few different options for storing tenant-specific
configurations within Flink state based on the messages I have flowing
through my job. Initially I had considered creating a source that would
periodically poll an HTTP API and connect that stream to my original event
stream.

However, I realized that this configuration information would basically
never change and thus it doesn't quite make sense to poll so frequently. My
next approach would be to have a function that would be keyed (by tenant)
and storing the configuration for that tenant in state (and issue an HTTP
call when I did not have it). Something like this:

class ConfigurationLookupFunction: KeyedProcessFunction() {
// Tenant specific configuration
private lateinit var httpClient: HttpClient
private lateinit var configuration: ValueState

override fun open(parameters: Configuration) {
super.open(parameters)
httpClient = HttpClient.newHttpClient()
}

override fun processElement(message: JsonObject, context: Context,
out: Collector) {
if (configuration.value() == null){
// Issue a request to the appropriate API to load the configuration
val url =
HttpRequest.newBuilder(URI.create(".../${context.currentKey}")).build()
httpClient.send(..., {
// Store the configuration info within state here
configuration.update(...)
})

out.collect(message)
}
else {
// Get the configuration information and pass it
downstream to be used by the sink
out.collect(message)
}
}
}

I didn't see any support for using the Async I/O functions from a keyed
context, otherwise I'd imagine that would be ideal. The requests themselves
should be very infrequent (initial call per tenant) and I'd imagine after
that the necessary configuration could be pulled/stored within the state
for that key.

Is there a good way of handling this that I might be overlooking with an
existing Flink construct or function? I'd love to be able to leverage the
Async I/O connectors as they seem pretty well thought out.

Thanks in advance!

Rion


Re: RabbitMQ 3.9+ Native Streams

2021-08-16 Thread David Morávek
Hi Rob,

there is currently no on-going effort for this topic, I think this would be
a really great contribution though. This seems to be pushing RabbitMQ
towards new usages ;)

Best,
D.

On Mon, Aug 16, 2021 at 8:16 PM Rob Englander 
wrote:

> I'm wondering if there's any work underway to develop DataSource/DataSink
> for RabbitMQ's streams recently released in RMQ 3.9?
>
> Rob Englander
>


RE: Upgrading from Flink on YARN 1.9 to 1.11

2021-08-16 Thread Hailu, Andreas [Engineering]
Hi David,

You’re correct about classpathing problems – thanks for your help in spotting 
them. I was able to get past that exception by removing some conflicting 
packages in my shaded JAR, but I’m seeing something else that’s interesting. 
With the 2 threads trying to submit jobs, one of the threads is able submit and 
process data successfully, while the other thread fails.

Log snippet:
2021-08-16 13:43:12,893 [thread-1] INFO  YarnClusterDescriptor - Cluster 
specification: ClusterSpecification{masterMemoryMB=4096, 
taskManagerMemoryMB=18432, slotsPerTaskManager=2}
2021-08-16 13:43:12,893 [thread-2] INFO  YarnClusterDescriptor - Cluster 
specification: ClusterSpecification{masterMemoryMB=4096, 
taskManagerMemoryMB=18432, slotsPerTaskManager=2}
2021-08-16 13:43:12,897 [thread-2] WARN  PluginConfig - The plugins directory 
[plugins] does not exist.
2021-08-16 13:43:12,897 [thread-1] WARN  PluginConfig - The plugins directory 
[plugins] does not exist.
2021-08-16 13:43:13,104 [thread-2] WARN  PluginConfig - The plugins directory 
[plugins] does not exist.
2021-08-16 13:43:13,104 [thread-1] WARN  PluginConfig - The plugins directory 
[plugins] does not exist.
2021-08-16 13:43:20,475 [thread-1] INFO  YarnClusterDescriptor - Adding 
delegation token to the AM container.
2021-08-16 13:43:20,488 [thread-1] INFO  DFSClient - Created 
HDFS_DELEGATION_TOKEN token 56247060 for delp on ha-hdfs:d279536
2021-08-16 13:43:20,512 [thread-1] INFO  TokenCache - Got dt for 
hdfs://d279536; Kind: HDFS_DELEGATION_TOKEN, Service: ha-hdfs:d279536, Ident: 
(HDFS_DELEGATION_TOKEN token 56247060 for delp)
2021-08-16 13:43:20,513 [thread-1] INFO  Utils - Attempting to obtain Kerberos 
security token for HBase
2021-08-16 13:43:20,513 [thread-1] INFO  Utils - HBase is not available (not 
packaged with this application): ClassNotFoundException : 
"org.apache.hadoop.hbase.HBaseConfiguration".
2021-08-16 13:43:20,564 [thread-2] WARN  YarnClusterDescriptor - Add job graph 
to local resource fail.
2021-08-16 13:43:20,570 [thread-1] INFO  YarnClusterDescriptor - Submitting 
application master application_1628992879699_11275
2021-08-16 13:43:20,570 [thread-2] ERROR FlowDataBase - Exception running data 
flow for thread-2
org.apache.flink.client.deployment.ClusterDeploymentException: Could not deploy 
Yarn job cluster.
at 
org.apache.flink.yarn.YarnClusterDescriptor.deployJobCluster(YarnClusterDescriptor.java:431)
at 
org.apache.flink.client.deployment.executors.AbstractJobClusterExecutor.execute(AbstractJobClusterExecutor.java:70)
at 
org.apache.flink.api.java.ExecutionEnvironment.executeAsync(ExecutionEnvironment.java:973)
at 
org.apache.flink.client.program.ContextEnvironment.executeAsync(ContextEnvironment.java:124)
at 
org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:72)
at 
com.gs.ep.da.lake.refinerlib.flink.ExecutionEnvironmentWrapper.execute(ExecutionEnvironmentWrapper.java:49)
...
Caused by: java.io.IOException: Filesystem closed
at org.apache.hadoop.hdfs.DFSClient.checkOpen(DFSClient.java:826)
at org.apache.hadoop.hdfs.DFSClient.listPaths(DFSClient.java:2152)
...
2021-08-16 13:43:20,979 [thread-1] INFO  TimelineClientImpl - Timeline service 
address: http://d279536-003.dc.gs.com:8188/ws/v1/timeline/
2021-08-16 13:43:21,377 [thread-1] INFO  YarnClientImpl - Submitted application 
application_1628992879699_11275
2021-08-16 13:43:21,377 [thread-1] INFO  YarnClusterDescriptor - Waiting for 
the cluster to be allocated
2021-08-16 13:43:21,379 [thread-1] INFO  YarnClusterDescriptor - Deploying 
cluster, current state ACCEPTED
2021-08-16 13:43:28,435 [thread-1] INFO  YarnClusterDescriptor - YARN 
application has been deployed successfully.
2021-08-16 13:43:28,436 [thread-1] INFO  YarnClusterDescriptor - Found Web 
Interface d279536-023.dc.gs.com:41019 of application 
'application_1628992879699_11275'.
2021-08-16 13:43:28,443 [thread-1] INFO  AbstractJobClusterExecutor - Job has 
been submitted with JobID e15bac7f9fb4b8b0d60f996c3a0880f3
Job has been submitted with JobID e15bac7f9fb4b8b0d60f996c3a0880f3
2021-08-16 13:43:38,629 [FlinkJobSubmitter.Poll] INFO  FlinkJobSubmitter$2 - 
job completed for thread-2 with parallelism 1
Program execution finished
Job with JobID e15bac7f9fb4b8b0d60f996c3a0880f3 has finished.

I’ve generated and sent you a signup link to our firm’s secure document-sharing 
app called Lockbox. In the repository, I’ve uploaded both our full client and 
YARN app logs (named half_failure-client_log and half_failure-yarn-log, 
respectively) in a directory named Flink support logs/Flink 1.11/1.11.2_POC. 
The logs are quite brief – would you be able to have a look at see if you can 
see if there’s something we’re doing that’s clearly wrong?

Something I did notice is that with the upgrade, our submissions are now using 
the introduction of this ContextEnvironment#executeAsync method. If it means 
a

RabbitMQ 3.9+ Native Streams

2021-08-16 Thread Rob Englander
I'm wondering if there's any work underway to develop DataSource/DataSink
for RabbitMQ's streams recently released in RMQ 3.9?

Rob Englander


Keystore format limitations for TLS

2021-08-16 Thread Alexis Sarda-Espinosa
Hello,

I am trying to configure TLS communication for a Flink cluster running on 
Kubernetes. I am currently using the BCFKS format and setting that as default 
via javax.net.ssl.keystoretype and javax.net.ssl.truststoretype (which are 
injected in the environment variable FLINK_ENV_JAVA_OPTS). The task manager is 
failing with "Invalid Keystore format", so I'm wondering if there are special 
limitations with regards to supported TLS configurations?

Regards,
Alexis.



Windowed Aggregation With Event Time over a Temporary View

2021-08-16 Thread Joseph Lorenzini




Hi all,
 
I am on Flink 1.12.3. 
 
So here’s the scenario: I have a Kafka topic as a source, where each record repsents a change to an append only audit log. The kafka record has the following fields:
 

id (unique identifier for that audit log entry)
operation id (is shared across multiple records)operation (string)start_ts (TIMESTAMP(3))end_ts (TIMESTAMP(3))
 
I am trying to calculate the average item count and duration per operation. I first converted the kafka source to an append only data stream and then I attempted to run the following SQL:
 
  Table workLogTable = tableEnv.fromDataStream(workLogStream)  

  tableEnv.createTemporaryView("work_log", workLogTable);
 Table workLogCntTable = tableEnv.sqlQuery("select operation_id, operation, max(start_ts) as start_ts, max(end_ts) as end_ts, count(*) as item_count, max(audit_ts) as audit_ts, max(event_time) as max_event_time" +
    " FROM work_log GROUP BY operation_id, operation");
    tableEnv.createTemporaryView("work_log_cnt", workLogCntTable);
    tableEnv.executeSql("select max(audit_ts), operation, avg(item_count) as average_item_count, AVG(end_ts - start_ts) as avg_duration from" +
    " work_log_cnt" +
    " GROUP BY TUMBLE(max_event_time, INTERVAL '1' SECOND), operation").print();
 
The problem I am having is that I am unable to preserve the event time between the first view and the second. I am getting this error:
 
caused by: org.apache.calcite.runtime.CalciteContextException: From line 1, column 139 to line 1, column 181: Cannot apply '$TUMBLE' to arguments of type '$TUMBLE(, )'. Supported form(s):
 '$TUMBLE(, )'
 
My guess is that the max function in the first query is converting the event time from DATETIME type to a BigInt. I am not sure how to apply an aggregate to the event time in the first query such that the event time from the original
 kafka stream can be used in the second view. Is there a way to make this work? 

 
Thanks,
Joe 
 

Privileged/Confidential Information may be contained in this message. If you are not the addressee indicated in this message (or responsible for delivery of the message to such person), you may not copy or deliver this message to anyone. In such case, you should
 destroy this message and kindly notify the sender by reply email. Please advise immediately if you or your employer does not consent to Internet email for messages of this kind. Opinions, conclusions and other information in this message that do not relate
 to the official business of my firm shall be understood as neither given nor endorsed by it.




Re: Dynamic Cluster/Index Routing for Elasticsearch Sink

2021-08-16 Thread Rion Williams
Thanks for this suggestion David, it's extremely helpful.

Since this will vary depending on the elements retrieved from a separate
stream, I'm guessing something like the following would be roughly the
avenue to continue down:

fun main(args: Array) {
val parameters = mergeParametersFromProperties(args)
val stream = StreamExecutionEnvironment.getExecutionEnvironment()

// Get the stream for tenant-specific Elastic configurations
val connectionStream = stream
.fromSource(
KafkaSource.of(parameters, listOf("elastic-configs")),
WatermarkStrategy.noWatermarks(),
"elastic-configs"
)

// Get the stream of incoming messages to be routed to Elastic
stream
.fromSource(
KafkaSource.of(parameters, listOf("messages")),
WatermarkStrategy.noWatermarks(),
"messages"
)
.keyBy { message ->
// Key by the tenant in the message
message.getTenant()
}
.connect(
// Connect the messages stream with the configurations
connectionStream
)
.process(object : KeyedCoProcessFunction() {
// For this key, we need to store all of the previous
messages in state
// in the case where we don't have a given mapping for
this tenant yet
lateinit var messagesAwaitingConfigState: ListState
lateinit var configState: ValueState

override fun open(parameters: Configuration) {
super.open(parameters)
// Initialize the states
messagesAwaitingConfigState =
runtimeContext.getListState(awaitingStateDesc)
configState = runtimeContext.getState(configStateDesc)
}

// When an element is received
override fun processElement1(message: String, context:
Context, out: Collector) {
// Check if we have a mapping
if (configState.value() == null){
// We don't have a mapping for this tenant, store
messages until we get it
messagesAwaitingConfigState.add(message)
}
else {
// Output the record with some indicator of the route?
out.collect(message)
}
}

override fun processElement2(config: String, context:
Context, out: Collector) {
// If this mapping is for this specific tenant, store
it and flush the pending
// records in state
if (config.getTenant() == context.currentKey){
configState.update(config)
val messagesToFlush = messagesAwaitingConfigState.get()
messagesToFlush.forEach { message ->
out.collect(message)
}
}
}

// State descriptors
val awaitingStateDesc = ListStateDescriptor(
"messages-awaiting-config",
TypeInformation.of(String::class.java)
)

val configStateDesc = ValueStateDescriptor(
"elastic-config",
TypeInformation.of(String::class.java)
)
})

stream.executeAsync("$APPLICATION_NAME-job")
}

Basically, connect my tenant-specific configuration stream with my incoming
messages (keyed by tenant) and buffer them until I have a corresponding
configuration (to avoid race-conditions). However, I'm guessing what will
happen here is rather than directly outputting the messages from this
process function, I'd construct some type of wrapper here with the
necessary routing/configuration for the message (obtained via the
configuration stream) along with the element, which might be something like
a MessageWrapper and pass those elements to the
sink, which would create the tenant-specific Elastic connection from the
ConfigurationT element and handle caching it and then just grab the element
and send it on it's way?

Those are really the only bits I'm stuck on at the moment:

   1. The shape of the elements being evicted from the process function (Is
   a simple wrapper with the configuration for the sink enough here? Do I need
   to explicitly initialize the sink within this function? Etc.)
   2. The actual use of the DynamicElasticsearchSink class (Would it just
   be something like an .addSink(DynamicElasticSearchSink()) or perhaps something else entirely?)

Thanks again so much for the advice thus far David, it's greatly
appreciated.

Rion

On Fri, Aug 13, 2021 at 9:04 AM David Morávek  wrote:

> To give you a better idea, in high-level I think could look something like
> this  [1].
>
> [1] https://gist.github.com/dmvk/3f8124d585cd33e52cacd4a38b80f8c8
>
> On Fri, Aug 13, 2021 at 2:57 PM Rion Williams 
> wrote:
>
>> Hi David,
>>
>> Thanks for your re

Re: NullPointerException in StateTable.put()

2021-08-16 Thread David Morávek
Great, let me know if that helped ;)

Best,
D.

On Mon, Aug 16, 2021 at 4:36 PM László Ciople 
wrote:

> The events are json dictionaries and the key is a field which represents a
> device id, or if it doesn't exist, then actually a *hashCode *of the
> device object in the dictionary is used. So this could be the problem then.
>
> On Mon, Aug 16, 2021 at 5:33 PM David Morávek  wrote:
>
>> Hi László,
>>
>> My intuition is that you have a non-deterministic shuffle key. If you
>> perform any "per-key" operation, you need to make sure that the same key
>> always end up in the same partition. To simplify this, it means that the
>> key needs to have a consistent *hashCode* and *equals* across different
>> JVMs.
>>
>> Usual mistake is that hash code is not deterministic, because it defaults
>> to `System.identityHashCode(..)` (this applies for any custom object that
>> doesn't override hashCode, for arrays, for enums, ...).
>>
>> What are you using as key for this operation?
>>
>> Best,
>> D.
>>
>> On Mon, Aug 16, 2021 at 4:00 PM László Ciople 
>> wrote:
>>
>>> Hello,
>>> I am trying to write a Flink application which receives data from Kafka,
>>> does processing on keyed windowed streams and sends results on a
>>> different topic.
>>> Shortly after the job is started it fails with a NullPointerException in
>>> StateTable.put(). I am really confused by this error, because I am not
>>> explicitly working with state and in the exception stack, I cannot find a
>>> reference to my own code. I'd really appreciate it if anyone would help me
>>> figure out what's going on. Here's the exception stack:
>>>
>>> 2021-08-16 16:28:54 2021-08-16 13:28:54,026 INFO
>>> org.apache.flink.runtime.executiongraph.ExecutionGraph   [] -
>>> Window(TumblingEventTimeWindows(180), EventTimeTrigger,
>>> CredentialStuffingAlerter) -> Map -> Sink:
>>> xdr.azure.analytics.login.credential_stuffing (2/2)
>>> (4b3004e64d8e3202535dd5521bff3584) switched from RUNNING to FAILED on
>>> senso-api-lciople-credential-stuffing-taskmanager-1-2 @
>>> ip-192-168-96-68.eu-central-1.compute.internal (dataPort=44941).
>>> 2021-08-16 16:28:54 java.lang.NullPointerException: null
>>> 2021-08-16 16:28:54 at
>>> org.apache.flink.runtime.state.heap.StateTable.put(StateTable.java:351)
>>> ~[flink-dist_2.11-1.12.3.jar:1.12.3]
>>> 2021-08-16 16:28:54 at
>>> org.apache.flink.runtime.state.heap.StateTable.put(StateTable.java:159)
>>> ~[flink-dist_2.11-1.12.3.jar:1.12.3]
>>> 2021-08-16 16:28:54 at
>>> org.apache.flink.runtime.state.heap.HeapListState.add(HeapListState.java:98)
>>> ~[flink-dist_2.11-1.12.3.jar:1.12.3]
>>> 2021-08-16 16:28:54 at
>>> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:422)
>>> ~[flink-dist_2.11-1.12.3.jar:1.12.3]
>>> 2021-08-16 16:28:54 at
>>> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:191)
>>> ~[flink-dist_2.11-1.12.3.jar:1.12.3]
>>> 2021-08-16 16:28:54 at
>>> org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:204)
>>> ~[flink-dist_2.11-1.12.3.jar:1.12.3]
>>> 2021-08-16 16:28:54 at
>>> org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:174)
>>> ~[flink-dist_2.11-1.12.3.jar:1.12.3]
>>> 2021-08-16 16:28:54 at
>>> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
>>> ~[flink-dist_2.11-1.12.3.jar:1.12.3]
>>> 2021-08-16 16:28:54 at
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:396)
>>> ~[flink-dist_2.11-1.12.3.jar:1.12.3]
>>> 2021-08-16 16:28:54 at
>>> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:191)
>>> ~[flink-dist_2.11-1.12.3.jar:1.12.3]
>>> 2021-08-16 16:28:54 at
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:617)
>>> ~[flink-dist_2.11-1.12.3.jar:1.12.3]
>>> 2021-08-16 16:28:54 at
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:581)
>>> ~[flink-dist_2.11-1.12.3.jar:1.12.3]
>>> 2021-08-16 16:28:54 at
>>> org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)
>>> ~[flink-dist_2.11-1.12.3.jar:1.12.3]
>>> 2021-08-16 16:28:54 at
>>> org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)
>>> ~[flink-dist_2.11-1.12.3.jar:1.12.3]
>>> 2021-08-16 16:28:54 at java.lang.Thread.run(Unknown Source) ~[?:?]
>>> 2021-08-16 16:28:54 2021-08-16 13:28:54,040 INFO
>>> org.apache.flink.runtime.executiongraph.ExecutionGraph   [] - Job Flink
>>> Streaming Job (80895c82f109899577d166b4388c157d) switched from state
>>> RUNNING to RESTARTING.
>>>
>>


Re: NullPointerException in StateTable.put()

2021-08-16 Thread David Morávek
My intuition is that you have a non-deterministic shuffle key. If you
perform any "per-key" operation, you need to make sure that the same key
always end up in the same partition. To simplify this, it means that the
key needs to have a consistent *hashCode* and *equals* across different
JVMs.

Usual mistake is that hash code is not deterministic, because it defaults
to `System.identityHashCode(..)` (this applies for any custom object that
doesn't override hashCode, for arrays, for enums, ...).

What are you using as key for this operation?

Best,
D.

On Mon, Aug 16, 2021 at 4:00 PM László Ciople 
wrote:

> Hello,
> I am trying to write a Flink application which receives data from Kafka,
> does processing on keyed windowed streams and sends results on a
> different topic.
> Shortly after the job is started it fails with a NullPointerException in
> StateTable.put(). I am really confused by this error, because I am not
> explicitly working with state and in the exception stack, I cannot find a
> reference to my own code. I'd really appreciate it if anyone would help me
> figure out what's going on. Here's the exception stack:
>
> 2021-08-16 16:28:54 2021-08-16 13:28:54,026 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph   [] -
> Window(TumblingEventTimeWindows(180), EventTimeTrigger,
> CredentialStuffingAlerter) -> Map -> Sink:
> xdr.azure.analytics.login.credential_stuffing (2/2)
> (4b3004e64d8e3202535dd5521bff3584) switched from RUNNING to FAILED on
> senso-api-lciople-credential-stuffing-taskmanager-1-2 @
> ip-192-168-96-68.eu-central-1.compute.internal (dataPort=44941).
> 2021-08-16 16:28:54 java.lang.NullPointerException: null
> 2021-08-16 16:28:54 at
> org.apache.flink.runtime.state.heap.StateTable.put(StateTable.java:351)
> ~[flink-dist_2.11-1.12.3.jar:1.12.3]
> 2021-08-16 16:28:54 at
> org.apache.flink.runtime.state.heap.StateTable.put(StateTable.java:159)
> ~[flink-dist_2.11-1.12.3.jar:1.12.3]
> 2021-08-16 16:28:54 at
> org.apache.flink.runtime.state.heap.HeapListState.add(HeapListState.java:98)
> ~[flink-dist_2.11-1.12.3.jar:1.12.3]
> 2021-08-16 16:28:54 at
> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:422)
> ~[flink-dist_2.11-1.12.3.jar:1.12.3]
> 2021-08-16 16:28:54 at
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:191)
> ~[flink-dist_2.11-1.12.3.jar:1.12.3]
> 2021-08-16 16:28:54 at
> org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:204)
> ~[flink-dist_2.11-1.12.3.jar:1.12.3]
> 2021-08-16 16:28:54 at
> org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:174)
> ~[flink-dist_2.11-1.12.3.jar:1.12.3]
> 2021-08-16 16:28:54 at
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
> ~[flink-dist_2.11-1.12.3.jar:1.12.3]
> 2021-08-16 16:28:54 at
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:396)
> ~[flink-dist_2.11-1.12.3.jar:1.12.3]
> 2021-08-16 16:28:54 at
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:191)
> ~[flink-dist_2.11-1.12.3.jar:1.12.3]
> 2021-08-16 16:28:54 at
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:617)
> ~[flink-dist_2.11-1.12.3.jar:1.12.3]
> 2021-08-16 16:28:54 at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:581)
> ~[flink-dist_2.11-1.12.3.jar:1.12.3]
> 2021-08-16 16:28:54 at
> org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)
> ~[flink-dist_2.11-1.12.3.jar:1.12.3]
> 2021-08-16 16:28:54 at
> org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)
> ~[flink-dist_2.11-1.12.3.jar:1.12.3]
> 2021-08-16 16:28:54 at java.lang.Thread.run(Unknown Source) ~[?:?]
> 2021-08-16 16:28:54 2021-08-16 13:28:54,040 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph   [] - Job Flink
> Streaming Job (80895c82f109899577d166b4388c157d) switched from state
> RUNNING to RESTARTING.
>


Re: Exploring Flink for a HTTP delivery service.

2021-08-16 Thread David Morávek
Hi Prasanna,

here are some quick thoughts

1) Batching is an aggregation operation.But what I have seen in the
> examples of windowing is that they get the count/max/min operation in the
> particular window.  So could the batching be implemented via a windowing
> mechanism ?
>

I guess a custom function with state and processing timers should give you
the flexibility you want. BTW Apache Beam is using this approach to the
batching you're describing (they call this bundles) [1][2].

2) Does the performance of broadcasted state better than LookUp Cache?
> (Personally i have implemented broadcasted state for other purpose and not
> sure about the performance of Querying DB+LookUpCache)
>

My intuition is that broadcasted state should be faster, because it's local
to the process. However, this would depend on state size, memory
requirements, garbage collector ... I think with smart batching throughput
of remote lookup may be as good as broadcasted state (in most of the
clouds, network is fast enough to do this). You should also take into
consideration if you want to have some kind of (watermark) synchronization
between the two streams (this is basically a special type of stream to
stream join), or if you don't care that config may not be to date.


> Using Keyed Process BroadCast looks Better than using non keyed as the
> same data is not replicated against all the parallel operators.
> A caveat here is that the load across all subscriptions are not the same .
> So if we key the stream , then we might have unbalanced job running.
> Thoughts on this ?
>

If one side (broadcasted) is way smaller compared to other side (and fits
in memory), it should be better to broadcast as you don't have to perform
an extra shuffle of the bigger side. This is also called map-side join.

4) Latency must be minimal , so the first thought is to store the
> messages to be batched in HashMapStateBackend.
> But to store both the State of config and the data in HEAP might increase
> the memory usage a lot. Since there could be a huge spike in load.Are
> there any other things that need to be factored in
>

There is a nice write-up on this topic in FLIP-50 [3]. Also GC is a great
factor when it comes to low latency (depends on what your requirements
are). We had a really great results with ZGC and Java 16, but it takes some
effort to get everything working (this is also not officially supported by
Flink community).

5) Auto Scaling capability would save a lot of cost because of consistent
> load patterns with occasional spikes. Though reactive scaling is introduced
> in flink 1.13 , we don't know whether its battle hardened .
>

There is still lot of work to be done in this direction. Having reactive
scaling is just one piece of the puzzle. Right now, re-scaling time
(savepoint + restore) is probably the biggest concern. If you want to have
a super low latency, re-scale might hit you hard, especially with larger
states.

6) After looking at the solutions , does flink seem to be a natural fit for
> this use case in comparison to Spring Reactor framework/vert.x ?
> One thing we see from the documentation is that spring reactive can auto
> scale very well but we need to work on fault tolerance/stability from the
> dev side which flink is good at.
>

These technologies are designed for a different purpose than Flink.

[1]
https://github.com/apache/beam/blob/v2.31.0/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java#L525
[2]
https://github.com/apache/beam/blob/v2.31.0/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java#L878
[3]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-50%3A+Spill-able+Heap+Keyed+State+Backend

Best,
D.


On Sat, Aug 14, 2021 at 5:12 PM Prasanna kumar <
prasannakumarram...@gmail.com> wrote:

> Hi,
>
> Aim: Building an event delivery
> service
> Scale : Peak load 50k messages/sec.
> Average load 5k messages/sec Expected to grow every passing month
> Unique Customer Endpoints : 10k+
> Unique events(kafka topics)  : 500+
> Unique tenants  : 30k+
> Subscription Level : Events are generated for tenants.
> Customers may subscribe a)
> entirely to an event or b) either at tenant level ( 5 tenants or 100
> tenants) or c) even at sub-tenant level. ( Tenant 2. Dept 100,200,300)
> *Other Requirements *:
> 1) Batching events based on quantity or minimum threshold time whichever
> comes first . Example 1000 messages or 1 sec.
> 2) Message size < 1kb
>
> *Possible Solutions:*
>
> 1) Build an app using reactive programming say vert.x/spring reactive etc
> 2) Use apache flink
>
> *Flink Solution *
> RDS : Has the subscription connection details
>
> [image: Flink HTTP Publisher.png]
>
> 2a ) Use DB and Look UP Cache to retrieve Configurations
>
> (i)   Str

NullPointerException in StateTable.put()

2021-08-16 Thread László Ciople
Hello,
I am trying to write a Flink application which receives data from Kafka,
does processing on keyed windowed streams and sends results on a
different topic.
Shortly after the job is started it fails with a NullPointerException in
StateTable.put(). I am really confused by this error, because I am not
explicitly working with state and in the exception stack, I cannot find a
reference to my own code. I'd really appreciate it if anyone would help me
figure out what's going on. Here's the exception stack:

2021-08-16 16:28:54 2021-08-16 13:28:54,026 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph   [] -
Window(TumblingEventTimeWindows(180), EventTimeTrigger,
CredentialStuffingAlerter) -> Map -> Sink:
xdr.azure.analytics.login.credential_stuffing (2/2)
(4b3004e64d8e3202535dd5521bff3584) switched from RUNNING to FAILED on
senso-api-lciople-credential-stuffing-taskmanager-1-2 @
ip-192-168-96-68.eu-central-1.compute.internal (dataPort=44941).
2021-08-16 16:28:54 java.lang.NullPointerException: null
2021-08-16 16:28:54 at
org.apache.flink.runtime.state.heap.StateTable.put(StateTable.java:351)
~[flink-dist_2.11-1.12.3.jar:1.12.3]
2021-08-16 16:28:54 at
org.apache.flink.runtime.state.heap.StateTable.put(StateTable.java:159)
~[flink-dist_2.11-1.12.3.jar:1.12.3]
2021-08-16 16:28:54 at
org.apache.flink.runtime.state.heap.HeapListState.add(HeapListState.java:98)
~[flink-dist_2.11-1.12.3.jar:1.12.3]
2021-08-16 16:28:54 at
org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:422)
~[flink-dist_2.11-1.12.3.jar:1.12.3]
2021-08-16 16:28:54 at
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:191)
~[flink-dist_2.11-1.12.3.jar:1.12.3]
2021-08-16 16:28:54 at
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:204)
~[flink-dist_2.11-1.12.3.jar:1.12.3]
2021-08-16 16:28:54 at
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:174)
~[flink-dist_2.11-1.12.3.jar:1.12.3]
2021-08-16 16:28:54 at
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
~[flink-dist_2.11-1.12.3.jar:1.12.3]
2021-08-16 16:28:54 at
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:396)
~[flink-dist_2.11-1.12.3.jar:1.12.3]
2021-08-16 16:28:54 at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:191)
~[flink-dist_2.11-1.12.3.jar:1.12.3]
2021-08-16 16:28:54 at
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:617)
~[flink-dist_2.11-1.12.3.jar:1.12.3]
2021-08-16 16:28:54 at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:581)
~[flink-dist_2.11-1.12.3.jar:1.12.3]
2021-08-16 16:28:54 at
org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)
~[flink-dist_2.11-1.12.3.jar:1.12.3]
2021-08-16 16:28:54 at
org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)
~[flink-dist_2.11-1.12.3.jar:1.12.3]
2021-08-16 16:28:54 at java.lang.Thread.run(Unknown Source) ~[?:?]
2021-08-16 16:28:54 2021-08-16 13:28:54,040 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph   [] - Job Flink
Streaming Job (80895c82f109899577d166b4388c157d) switched from state
RUNNING to RESTARTING.


Re: s3 access denied with flink-s3-fs-presto

2021-08-16 Thread David Morávek
Hi Vamshi,

>From your configuration I'm guessing that you're using Amazon S3 (not any
implementation such as Minio).

Two comments:
- *s3.endpoint* should not contain bucket (this is included in your s3
path, eg. *s3:///*)
- "*s3.path.style.access*: true" is only correct for 3rd party
implementation such as Minio / Swift, that have bucket definied in url path
instead of subdomain

You can find some information about connecting to s3 in Flink docs [1].

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/deployment/filesystems/s3/

Best,
D.


On Tue, Aug 10, 2021 at 2:37 AM Vamshi G  wrote:

> We are using Flink version 1.13.0 on Kubernetes.
> For checkpointing we have configured fs.s3 flink-s3-fs-presto.
> We have enabled sse on our buckets with kms cmk.
>
> flink-conf.yaml is configured as below.
> s3.entropy.key: _entropy_
> s3.entropy.length: 4
> s3.path.style.access: true
> s3.ssl.enabled: true
> s3.sse.enabled: true
> s3.sse.type: KMS
> s3.sse.kms-key-id: 
> s3.iam-role: 
> s3.endpoint: .s3-us-west-2.amazonaws.com
> s3.credentials-provider:
> com.amazonaws.auth.profile.ProfileCredentialsProvider
>
> However, PUT operations on the bucket are resulting in access denied
> error. Access policies for the role are checked and works fine when checked
> with CLI.
> Also, can't get to see debug logs from presto s3 lib, is there a way to
> enable logger for presto airlift logging?
>
> Any inputs on above issue?
>
>


Re: Upgrading from Flink on YARN 1.9 to 1.11

2021-08-16 Thread David Morávek
Hi Andreas,

Per-job and session deployment modes should not be affected by this FLIP.
Application mode is just a new deployment mode (where job driver runs
embedded within JM), that co-exists with these two.

>From information you've provided, I'd say your actual problem is this
exception:

```

Caused by: java.lang.ExceptionInInitializerError

at
com.sun.jersey.core.spi.factory.MessageBodyFactory.initReaders(MessageBodyFactory.java:182)

at
com.sun.jersey.core.spi.factory.MessageBodyFactory.initReaders(MessageBodyFactory.java:175)

at
com.sun.jersey.core.spi.factory.MessageBodyFactory.init(MessageBodyFactory.java:162)

at com.sun.jersey.api.client.Client.init(Client.java:342)

at com.sun.jersey.api.client.Client.access$000(Client.java:118)

at com.sun.jersey.api.client.Client$1.f(Client.java:191)

at com.sun.jersey.api.client.Client$1.f(Client.java:187)

at
com.sun.jersey.spi.inject.Errors.processWithErrors(Errors.java:193)

at com.sun.jersey.api.client.Client.(Client.java:187)

   at com.sun.jersey.api.client.Client.(Client.java:170)

at
org.apache.hadoop.yarn.client.api.impl.TimelineClientImpl.serviceInit(TimelineClientImpl.java:285)
```


I've seen this exception a few times with Hadoop already and it's usually a
dependency / class-path problem. If you google for this you'll find many
references.


Best,

D.



On Fri, Aug 13, 2021 at 9:40 PM Hailu, Andreas [Engineering] <
andreas.ha...@gs.com> wrote:

> Hello folks!
>
>
>
> We’re looking to upgrade from 1.9 to 1.11. Our Flink applications run on
> YARN and each have their own clusters, with each application having
> multiple jobs submitted.
>
>
>
> Our current submission command looks like this:
>
> $ run -m yarn-cluster --class com.class.name.Here -p 2 -yqu queue-name
> -ynm app-name -yn 1 -ys 2 -yjm 4096 -ytm 12288 /path/to/artifact.jar
> -application-args-go-here
>
>
>
> The behavior observed in versions <= 1.9 is the following:
>
> 1. A Flink cluster gets deployed to YARN
>
> 2. Our application code is run, building graphs and submitting jobs
>
>
>
> When we rebuilt and submit using 1.11.2, we now observe the following:
>
> 1. Our application code is run, building graph and submitting jobs
>
> 2. A Flink cluster gets deployed to YARN once execute() is invoked
>
>
>
> I presume that this is a result of FLIP-85 [1] ?
>
>
>
> This change in behavior proves to be a problem for us as our application
> is multi-threaded, and each thread submits its own job to the Flink
> cluster. What we see is the first thread to peexecute() submits a job to
> YARN, and others fail with a ClusterDeploymentException.
>
>
>
> 2021-08-13 14:47:42,299 [flink-thread-#1] INFO  YarnClusterDescriptor -
> Cluster specification: ClusterSpecification{masterMemoryMB=4096,
> taskManagerMemoryMB=18432, slotsPerTaskManager=2}
>
> 2021-08-13 14:47:42,299 [flink-thread-#2] INFO  YarnClusterDescriptor -
> Cluster specification: ClusterSpecification{masterMemoryMB=4096,
> taskManagerMemoryMB=18432, slotsPerTaskManager=2}
>
> 2021-08-13 14:47:42,304 [flink-thread-#1] WARN  PluginConfig - The plugins
> directory [plugins] does not exist.
>
> 2021-08-13 14:47:42,304 [flink-thread-#2] WARN  PluginConfig - The plugins
> directory [plugins] does not exist.
>
> Listening for transport dt_socket at address: 5005
>
> 2021-08-13 14:47:46,716 [flink-thread-#2] WARN  PluginConfig - The plugins
> directory [plugins] does not exist.
>
> 2021-08-13 14:47:46,716 [flink-thread-#1] WARN  PluginConfig - The plugins
> directory [plugins] does not exist.
>
> 2021-08-13 14:47:54,820 [flink-thread-#1] INFO  YarnClusterDescriptor -
> Adding delegation token to the AM container.
>
> 2021-08-13 14:47:54,837 [flink-thread-#1] INFO  DFSClient - Created
> HDFS_DELEGATION_TOKEN token 56208379 for delp on ha-hdfs:d279536
>
> 2021-08-13 14:47:54,860 [flink-thread-#1] INFO  TokenCache - Got dt for
> hdfs://d279536; Kind: HDFS_DELEGATION_TOKEN, Service: ha-hdfs:d279536,
> Ident: (HDFS_DELEGATION_TOKEN token 56208379 for user)
>
> 2021-08-13 14:47:54,860 [flink-thread-#1] INFO  Utils - Attempting to
> obtain Kerberos security token for HBase
>
> 2021-08-13 14:47:54,861 [flink-thread-#1] INFO  Utils - HBase is not
> available (not packaged with this application): ClassNotFoundException :
> "org.apache.hadoop.hbase.HBaseConfiguration".
>
> 2021-08-13 14:47:54,901 [flink-thread-#1] INFO  YarnClusterDescriptor -
> Submitting application master application_1628393898291_71530
>
> 2021-08-13 14:47:54,904 [flink-thread-#2] ERROR FlowDataBase - Exception
> running data flow for flink-thread-#2
>
> org.apache.flink.client.deployment.ClusterDeploymentException: Could not
> deploy Yarn job cluster.
>
> at
> org.apache.flink.yarn.YarnClusterDescriptor.deployJobCluster(YarnClusterDescriptor.java:431)
>
> at
> org.apache.flink.client.deployment.executors.AbstractJobClusterExecutor.execute(AbstractJobClusterExecutor.java:

Re: Problems with reading ORC files with S3 filesystem

2021-08-16 Thread David Morávek
Hi Piotr,

unfortunately this is a known long-standing issue [1]. The problem is that
ORC format is not using Flink's filesystem abstraction for actual reading
of the underlying file, so you have to adjust your Hadoop config
accordingly. There is also a corresponding SO question [2] covering this.

I think a proper fix would actually require changing the interface on ORC
side, because currently there seems to be now easy way to switch the FS
implementation (I've just quickly checked OrcFile class, so this might not
be 100% accurate).

[1] https://issues.apache.org/jira/browse/FLINK-10989
[2] https://stackoverflow.com/a/53435359

Best,
D.

On Sat, Aug 14, 2021 at 11:40 AM Piotr Jagielski  wrote:

> Hi,
> I want to use Flink SQL filesystem to read ORC file via S3 filesystem on
> Flink 1.13. My table definition looks like this:
>
> create or replace table xxx
>  (..., startdate string)
>  partitioned by (startdate) with ('connector'='filesystem',
> 'format'='orc', 'path'='s3://xxx/orc/yyy')
>
> I followed Flink's S3 guide and installed S3 libs as plugin. I have MinIO
> as S3 provider and it works for Flinks checkpoints and HA files.
> The SQL connector also works when I use CSV or Avro formats. The problems
> start with ORC
>
> 1. If I just put flink-orc on job's classpath I get error on JobManager:
> Caused by: java.lang.NoClassDefFoundError:
> org/apache/hadoop/conf/Configuration
> at
> org.apache.flink.orc.OrcFileFormatFactory$1.createRuntimeDecoder(OrcFileFormatFactory.java:121)
> ~[?:?]
> at
> org.apache.flink.orc.OrcFileFormatFactory$1.createRuntimeDecoder(OrcFileFormatFactory.java:88)
> ~[?:?]
> at
> org.apache.flink.table.filesystem.FileSystemTableSource.getScanRuntimeProvider(FileSystemTableSource.java:118)
> ~[flink-table-blink_2.12-1.13.2.jar:1.13.2]
>
> 2. I managed to put hadoop common libs on the classpath by this maven
> setup:
>
> 
> org.apache.flink
>
> flink-orc_${scala.binary.version}
> ${flink.version}
> 
> 
> org.apache.orc
> orc-core
> 
> 
> 
> 
> org.apache.orc
> orc-core
> 1.5.6
> 
> 
> org.apache.orc
> orc-shims
> 1.5.6
> 
> 
> net.java.dev.jets3t
> jets3t
> 0.9.0
> 
>
> No the job is accepted by JobManager, but execution fails with lack of AWS
> credentials:
> Caused by: java.lang.IllegalArgumentException: AWS Access Key ID and
> Secret Access Key must be specified as the username or password
> (respectively) of a s3 URL, or by setting the fs.s3.awsAccessKeyId or
> fs.s3.awsSecretAccessKey properties (respectively).
> at
> org.apache.hadoop.fs.s3.S3Credentials.initialize(S3Credentials.java:70)
> at
> org.apache.hadoop.fs.s3.Jets3tFileSystemStore.initialize(Jets3tFileSystemStore.java:92)
> at
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
> Method)
> at
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown
> Source)
> at
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown
> Source)
> at java.base/java.lang.reflect.Method.invoke(Unknown Source)
> at
> org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:186)
> at
> org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
> at com.sun.proxy.$Proxy76.initialize(Unknown Source)
> at
> org.apache.hadoop.fs.s3.S3FileSystem.initialize(S3FileSystem.java:92)
> at
> org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2433)
> at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:88)
> at
> org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2467)
> at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2449)
> at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:367)
> at org.apache.hadoop.fs.Path.getFileSystem(Path.java:287)
> at
> org.apache.orc.impl.ReaderImpl.getFileSystem(ReaderImpl.java:395)
> at org.apache.orc.impl.ReaderImpl.(ReaderImpl.java:368)
> at org.apache.orc.OrcFile.createReader(OrcFile.java:343)
>
> I guess that ORC reader tries to recreate s3 filesystem in job's
> classloader and cannot use credentials from flink-conf.yaml. However I can
> see in the logs that it earlier managed to list the files on MinIO:
>
> 2021-08-14 09:35:48,285 INFO
> org.apache.flink.connector.file.src.assigners.LocalityAwareSpl

Re: Scaling Flink for batch jobs

2021-08-16 Thread Gorjan Todorovski
Thanks, I'll check more about job tuning.

On Mon, 16 Aug 2021 at 06:28, Caizhi Weng  wrote:

> Hi!
>
> if I use parallelism of 2 or 4 - it takes the same time.
>>
> It might be that there is no data in some parallelisms. You can click on
> the nodes in Flink web UI and see if it is the case for each parallelism,
> or you can check out the metrics of each operator.
>
> if I don't increase parallelism and just run the job on a fixed number of
>> task slots, the job will fail (due to lack of memory on the task manager)or
>> it will just take longer time to process the data?
>>
> It depends on a lot of aspects, such as the type of source you are using,
> the type of operators you are running, etc. Ideally we hope it will just
> take longer but for some specific operators or connectors it might fail.
> This is where users have to tune their jobs.
>
> Gorjan Todorovski  于2021年8月13日周五 下午6:48写道:
>
>> Hi!
>>
>> I want to implement a Flink cluster as a native Kubernetes session
>> cluster, with intention of executing Apache Beam jobs that will process
>> only batch data, but I am not sure I understand how I would scale the
>> cluster if I need to process large datasets.
>>
>> My understanding is that to be able to process a bigger dataset, you
>> could run it with higher parallelism, so the processing will be spread on
>> multiple task slots, which might run multiple nodes.
>> But running Beam jobs which actually in my case execute TensorFlow
>> Extended pipelines, I am not able to have control over partitioning over
>> some keys and I don't see any difference in throughput (the time it takes
>> to process specific dataset), if I use parallelism of 2 or 4 - it takes the
>> same time.
>>
>> Also, does it mean if I want to process a dataset of any size since the
>> execution is of type "PIPELINED", does this mean, if I don't increase
>> parallelism and just run the job on a fixed number of task slots, the job
>> will fail (due to lack of memory on the task manager)or it will just take
>> longer time to process the data?
>>
>> Thanks,
>> Gorjan
>>
>


Re: JobManager Resident memory Always Increasing

2021-08-16 Thread David Morávek
Hi Pranjul,

which deployment mode are you using?

- For session cluster, I'd say it's possible that memory grows with # of
jobs.
- For application mode, there is actual user-code executed, so if you're
using some native libraries in your job driver, that may be another reason
for the growing memory.

If you want to debug the issue yourself, Yun already provided good pointers
on how to approach that.

Best,
D.


On Mon, Aug 16, 2021 at 8:38 AM Yun Tang  wrote:

> Hi Pranjul,
>
> First of all, you adopted on-heap state backend: HashMapStateBackend,
> which would not use native off-heap memory. Moreover, JobManager would not
> initialize any keyed state backend instance. And if not enable high
> availability, JobManagerCheckpointStorage would also not use direct memory
> to write checkpoint stream out. Did you use some third-party native library?
>
> You could use native memory tracking [1] to see whether JVM used too much
> overhead native memory. And use memory allocator analysis tool such as
> JeMalloc [2] to see whether existed unexpected native memory usage.
>
>
> [1] https://shipilev.net/jvm/anatomy-quarks/12-native-memory-tracking/
> [2] https://gist.github.com/thomasdarimont/79b3cef01e5786210309
>
> Best
> Yun Tang
> --
> *From:* Pranjul Ahuja 
> *Sent:* Monday, August 16, 2021 13:10
> *To:* user@flink.apache.org 
> *Subject:* JobManager Resident memory Always Increasing
>
> Hi,
>
> We are running the JobManager container with 1024GB out of which 512MB is
> allocated to the heap. We observe that the JobManager container's resident
> memory is always increasing and it never comes down. Heap usage remains to
> be constant and not rising abruptly. Can anyone help here where else this
> memory is going?
>
> Statebackend Used - HashMapStateBackend
> Checkpoint Storage - JobManagerCheckpointStorage
>
>
>