Anyone trying to adopt Scotty on the recent Flink versions?

2021-12-27 Thread Dongwon Kim
Hi community,

We're recently trying to adopt Scotty to overcome the poor performance
caused by too many sliding windows.

We're facing the following exception on the latest Flink-1.14.2:

switched from RUNNING to FAILED with failure cause:
java.lang.ArrayIndexOutOfBoundsException: -1
at java.util.ArrayList.elementData(ArrayList.java:424)
at java.util.ArrayList.get(ArrayList.java:437)
at 
de.tub.dima.scotty.slicing.aggregationstore.LazyAggregateStore.getSlice(LazyAggregateStore.java:53)
at 
de.tub.dima.scotty.slicing.aggregationstore.LazyAggregateStore.insertValueToSlice(LazyAggregateStore.java:64)
at 
de.tub.dima.scotty.slicing.SliceManager.processElement(SliceManager.java:76)
at 
de.tub.dima.scotty.slicing.SlicingWindowOperator.processElement(SlicingWindowOperator.java:43)
at 
de.tub.dima.scotty.flinkconnector.KeyedScottyWindowOperator.processElement(KeyedScottyWindowOperator.java:62)
at 
org.apache.flink.streaming.api.operators.KeyedProcessOperator.processElement(KeyedProcessOperator.java:83)
at 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:233)
at 
org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134)
at 
org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105)
at 
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:496)
at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:809)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:761)
at 
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958)
at 
org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
at java.lang.Thread.run(Thread.java:748)

We create a window operator as follows:

import de.tub.dima.scotty.core.windowType.{SlidingWindow, WindowMeasure}
import de.tub.dima.scotty.flinkconnector.KeyedScottyWindowOperator
import de.tub.dima.scotty.flinkconnector.{_}

val windowOp = new KeyedScottyWindowOperator[(java.lang.String,
Long), NaviGpsProcessable, NaviTrafficUserResult](new
NaviTrafficUserAggregationScotty())
windowOp.addWindow(new SlidingWindow(WindowMeasure.Time, 600_000, 60_000))

val userAggStream = stream
  .keyBy(el => (el.id, el.trafficId))
  .process(windowOp)
  .map(_.getAggValues.get(0));

Can I get any advice on this?

Best,

Dongwon


Re: How to reduce interval between Uptime Metric meaasurements?

2021-12-27 Thread Caizhi Weng
Hi!

Have you tried metrics.reporter.promgateway.interval? See [1] for more
detail.

[1]
https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/metric_reporters/#prometheuspushgateway

Geldenhuys, Morgan Karl  于2021年12月28日周二
02:43写道:

> Hello everyone,
>
>
> I have a flink 1.14 job running and im looking at the uptime metric (
> flink_jobmanager_job_uptime) together with prometheus (scrape every
> second). It looks as if this metric is updated every 60 seconds, is there a
> way of decreasing this interval? A fixed delay recovery strategy of 1s is
> being used. There doesnt seem to be anything related to this in the configs
> (
> https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/
> ).
>
>
> Regards,
>
> M.
>


How to reduce interval between Uptime Metric meaasurements?

2021-12-27 Thread Geldenhuys, Morgan Karl
Hello everyone,


I have a flink 1.14 job running and im looking at the uptime metric 
(flink_jobmanager_job_uptime) together with prometheus (scrape every second). 
It looks as if this metric is updated every 60 seconds, is there a way of 
decreasing this interval? A fixed delay recovery strategy of 1s is being used. 
There doesnt seem to be anything related to this in the configs 
(https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/).



Regards,

M.


Re: Alternatives of KafkaDeserializationSchema.isEndOfStream()

2021-12-27 Thread Dong Lin
Hi Arvid,

Thanks for the suggestion! Sorry for the late reply. I just finished
investigating the PulsarSource/StopCursor as you suggested. Please see my
reply inline.

On Sun, Dec 19, 2021 at 6:53 PM Arvid Heise  wrote:

> Hi Dong,
>
> I see your point. The main issue with dynamic EOF is that we can't run in
> batch mode. That may be desired in the case of Ayush but there may be other
> use cases where it's not.
>

Could you help explain why we can not dynamically stop reading from a
source in batch mode?

My understanding is that when a message with the particular pattern
(specified by the user) is encountered, we can have the source operator
emit the high-watermark in such a way as if the particular partition of
this source has reached EOF. And this must have worked since users have
been using KafkaDeserializationSchema::isEndOfStream with the
legacy FlinkKafkaConsumer. Did I miss something here?

Additionally, it's quite a bit of code if you'd implement a
> KafkaRecordDeserializationSchema from scratch. There is also no obvious way
> on how to use it from Table/SQL.
>

Hmm.. users already need to provide a KafkaRecordDeserializationSchema
via KafkaSourceBuilder::setDeserializer(...) today even if we don't support
dynamic EOF. Do you mean that if we support dynamic EOF, then it will be
harder for user to implement KafkaRecordDeserializationSchema?

Regarding "how to use it from Table/SQL", support we allow user to encode
this dynamic EOF logic inside KafkaRecordDeserializationSchema (e.g. call
Collector::close() if the message content matches a user-specified
pattern), then effect of this change is same as if the partition has
reached EOF, and Table/SQL can handle this effect as they are doing now
without any extra change. Does this make sense?


>
> I think we should get inspired on how PulsarSource is solving it. They
> have an orthogonal interface StopCursor (we could call it StopCondition)
> [1]. It has some default values (I wonder if we could implement them as
> enums for easier Table integration).
>

It appears that StopCursor::shouldStop(...) takes a raw Message. While user
could implement the dynamic EOF logic in this method, I am worried that
this approach would lead to inferior performance due to double message
deserialization.

The reason is that the user's logic will likely depend on the de-serialized
message (as opposed to the raw byte in the
org.apache.pulsar.client.api.Message.getData()). In this case, users will
need to deserialize the message inside StopCursor::shouldStop(...) first
and then the message would be de-serialized again by
the PulsarDeserializationSchema, which is specified via
the PulsarSourceBuilder::setDeserializationSchema.

In comparison, messages can be deserialized only once if we allow users to
specify the dynamic EOF logic inside
KafkaRecordDeserializationSchema/PulsarDeserializationSchema.


> Ideally, this interface would subsume OffsetsInitializer on stopping side.
> I think it was not wise to use OffsetsInitializer also for stop offsets as
> things like OffsetResetStrategy do not make any sense.
>

Do you mean that you prefer to replace
KafkaSourceBuilder::setBounded(OffsetsInitializer) with something like
PulsarSourceBuilder::setBoundedStopCursor(StopCursor)?

Without digging into detail whether this replacement is feasible, I
agree StopCursor seems to be cleaner than OffsetsInitializer. On the other
hand, if we don't plan to put the dynamic EOF logic inside StopCursor (e.g.
due to the double serialization issue described above), I guess it is
probably simpler to separate this from the discussion of the dynamic EOF?


>
> Compared to Pulsar, I like the PartitionOffsetsRetriever to avoid having
> to hand in the KafkaClient (as we do in Pulsar).
>

Do you mean that you prefer to remove KafkaClient from
PartitionOffsetsRetrieverImpl?

I agree it is cleaner to let PartitionOffsetsRetrieverImpl use adminClient
only without using KafkaClient. On the other hand, it seems that there is
no performance/correctness concern with the existing approach? Is this
issue related to the discussion of dynamic EOF?


> I hope I gave some pointers.
>
> [1]
> https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/StopCursor.java#L41-L41
>
> On Fri, Dec 17, 2021 at 9:00 AM Dong Lin  wrote:
>
>> Yep,  dynamic schema change could be a good solution for the particular
>> use-case mentioned by Ayush.
>>
>> On the other hand, I have heard of valid use-cases where we want to stop
>> the job based on a control message. For example, let's say we have a Flink
>> job that keeps processing stock transaction data fetched from Kafka in real
>> time. Suppose the stock market closes at 4pm, we probably want the Flink
>> job to stop after it has processed all the transaction data of that day,
>> instead of running it for the whole day, in order to save CPU cost.
>>
>> As of Flink 1.13, users can ach

Re: How to handle java.lang.OutOfMemoryError: Metaspace

2021-12-27 Thread John Smith
Ok all settings above are for smaller dev cluster and I'm experimenting to
set metasize to 2GB. It runs same jobs as production just less volume in
terms of data.

The below snapshot of JCMD are of a slightly bigger task manager and the
active cluster... It also once in a while does metaspace so thinking
updating metaspace to 2GB. This is what started the actual investigation.

taskmanager.memory.flink.size: 10240m
taskmanager.memory.jvm-metaspace.size: 1024m <-- Up to 2GB.
taskmanager.numberOfTaskSlots: 12

jcmd 2128 GC.heap_info
2128:
 garbage-first heap   total 5111808K, used 2530277K [0x00068880,
0x000688a04e00, 0x0007c080)
  region size 2048K, 810 young (1658880K), 4 survivors (8192K)
 Metaspace   used 998460K, capacity 1022929K, committed 1048576K,
reserved 1972224K
  class spaceused 112823K, capacity 121063K, committed 126024K,
reserved 1048576K

On Mon, 27 Dec 2021 at 10:27, John Smith  wrote:

> Yes standalone cluster. 3 zoo, 3 job, 3 tasks.
>
> The task managers have taskslots at double core. So 2*4
>
> I think metaspace of 2GB is ok. I'll try to get some jcmd stats.
>
> The jobs are fairly straight forward ETL they read from Kafka, do some
> json parsing, using vertx.io json parser and either Insert to apache
> ignite cache or jdbc db.
>
>
> On Sun., Dec. 26, 2021, 8:46 p.m. Xintong Song, 
> wrote:
>
>> Hi John,
>>
>> Sounds to me you have a Flink standalone cluster deployed directly on
>> physical hosts. If that is the case, use `t.m.flink.size` instead of
>> `t.m.process.size`. The latter does not limit the overall memory
>> consumption of the processes, and is only used for calculating how much
>> non-JVM memory the process should leave in a containerized setup, which
>> does no good in a non-containerized setup.
>>
>> When running into a Metaspace OOM, the standard solution is to increase
>> `t.m.jvm-metaspace.size`. If this is impractical due to the physical
>> limitations, you may also try to decrease `taskmanager.numberOfTaskSlots`.
>> If you have multiple jobs submitted to a shared Flink cluster, decreasing
>> the number of slots in a task manager should also reduce the amount of
>> classes loaded by the JVM, thus requiring less metaspace.
>>
>> Thank you~
>>
>> Xintong Song
>>
>>
>>
>> On Mon, Dec 27, 2021 at 9:08 AM John Smith 
>> wrote:
>>
>>> Ok I tried taskmanager.memory.process.size: 7168m
>>>
>>> It's worst, the task manager can barely start before it throws
>>> java.lang.OutOfMemoryError: Metaspace
>>>
>>> I will try...
>>> taskmanager.memory.flink.size: 5120m
>>> taskmanager.memory.jvm-metaspace.size: 2048m
>>>
>>>
>>> On Sun, 26 Dec 2021 at 19:46, John Smith  wrote:
>>>
 Hi running Flink 1.10

 I have

 taskmanager.memory.flink.size: 6144m
 taskmanager.memory.jvm-metaspace.size: 1024m
 taskmanager.numberOfTaskSlots: 8
 parallelism.default: 1

 1- The host has a physical ram of 8GB. I'm better off just to configure
 "taskmanager.memory.process.size" as 7GB and let flink figure it out?
 2- Is there a way for me to calculate how much metspace my jobs require
 or are using?

 2021-12-24 04:53:32,511 ERROR
 org.apache.flink.runtime.util.FatalExitExceptionHandler   - FATAL:
 Thread 'flink-akka.actor.default-dispatcher-86' produced an uncaught
 exception. Stopping the process...
 java.lang.OutOfMemoryError: Metaspace

>>>


Re: How to handle java.lang.OutOfMemoryError: Metaspace

2021-12-27 Thread John Smith
Yes standalone cluster. 3 zoo, 3 job, 3 tasks.

The task managers have taskslots at double core. So 2*4

I think metaspace of 2GB is ok. I'll try to get some jcmd stats.

The jobs are fairly straight forward ETL they read from Kafka, do some json
parsing, using vertx.io json parser and either Insert to apache ignite
cache or jdbc db.


On Sun., Dec. 26, 2021, 8:46 p.m. Xintong Song, 
wrote:

> Hi John,
>
> Sounds to me you have a Flink standalone cluster deployed directly on
> physical hosts. If that is the case, use `t.m.flink.size` instead of
> `t.m.process.size`. The latter does not limit the overall memory
> consumption of the processes, and is only used for calculating how much
> non-JVM memory the process should leave in a containerized setup, which
> does no good in a non-containerized setup.
>
> When running into a Metaspace OOM, the standard solution is to increase
> `t.m.jvm-metaspace.size`. If this is impractical due to the physical
> limitations, you may also try to decrease `taskmanager.numberOfTaskSlots`.
> If you have multiple jobs submitted to a shared Flink cluster, decreasing
> the number of slots in a task manager should also reduce the amount of
> classes loaded by the JVM, thus requiring less metaspace.
>
> Thank you~
>
> Xintong Song
>
>
>
> On Mon, Dec 27, 2021 at 9:08 AM John Smith  wrote:
>
>> Ok I tried taskmanager.memory.process.size: 7168m
>>
>> It's worst, the task manager can barely start before it throws
>> java.lang.OutOfMemoryError: Metaspace
>>
>> I will try...
>> taskmanager.memory.flink.size: 5120m
>> taskmanager.memory.jvm-metaspace.size: 2048m
>>
>>
>> On Sun, 26 Dec 2021 at 19:46, John Smith  wrote:
>>
>>> Hi running Flink 1.10
>>>
>>> I have
>>>
>>> taskmanager.memory.flink.size: 6144m
>>> taskmanager.memory.jvm-metaspace.size: 1024m
>>> taskmanager.numberOfTaskSlots: 8
>>> parallelism.default: 1
>>>
>>> 1- The host has a physical ram of 8GB. I'm better off just to configure
>>> "taskmanager.memory.process.size" as 7GB and let flink figure it out?
>>> 2- Is there a way for me to calculate how much metspace my jobs require
>>> or are using?
>>>
>>> 2021-12-24 04:53:32,511 ERROR
>>> org.apache.flink.runtime.util.FatalExitExceptionHandler   - FATAL:
>>> Thread 'flink-akka.actor.default-dispatcher-86' produced an uncaught
>>> exception. Stopping the process...
>>> java.lang.OutOfMemoryError: Metaspace
>>>
>>


Re: Parquet files in streaming mode

2021-12-27 Thread Martijn Visser
Hi,

Have you looked into File Compaction (which is supported in the Table/SQL
side)? [1]

Best regards,

Martijn

[1]
https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/filesystem/#file-compaction

On Mon, 27 Dec 2021 at 16:10, Deepak Sharma  wrote:

> I would suggest taking a look at CheckpointRollingPolicy.
> You need to extend it and override the default behviors in your FileSink.
>
> HTH.
>
> Thanks
> Deepak
>
> On Mon, Dec 27, 2021 at 8:13 PM Mathieu D  wrote:
>
>> Hello,
>>
>> We’re trying to use a Parquet file sink to output files in s3.
>>
>> When running in Streaming mode, it seems that parquet files are flushed
>> and rolled at each checkpoint. The result is a crazy high number of very
>> small parquet files which completely defeats the purpose of that format.
>>
>>
>> Is there a way to build larger output parquet files? Or is it only at the
>> price of having a very large checkpointing interval?
>>
>> Thanks for your insights.
>>
>> Mathieu
>>
>
>
> --
> Thanks
> Deepak
> www.bigdatabig.com
> www.keosha.net
>


Re: Parquet files in streaming mode

2021-12-27 Thread Deepak Sharma
I would suggest taking a look at CheckpointRollingPolicy.
You need to extend it and override the default behviors in your FileSink.

HTH.

Thanks
Deepak

On Mon, Dec 27, 2021 at 8:13 PM Mathieu D  wrote:

> Hello,
>
> We’re trying to use a Parquet file sink to output files in s3.
>
> When running in Streaming mode, it seems that parquet files are flushed
> and rolled at each checkpoint. The result is a crazy high number of very
> small parquet files which completely defeats the purpose of that format.
>
>
> Is there a way to build larger output parquet files? Or is it only at the
> price of having a very large checkpointing interval?
>
> Thanks for your insights.
>
> Mathieu
>


-- 
Thanks
Deepak
www.bigdatabig.com
www.keosha.net


Parquet files in streaming mode

2021-12-27 Thread Mathieu D
Hello,

We’re trying to use a Parquet file sink to output files in s3.

When running in Streaming mode, it seems that parquet files are flushed and
rolled at each checkpoint. The result is a crazy high number of very small
parquet files which completely defeats the purpose of that format.


Is there a way to build larger output parquet files? Or is it only at the
price of having a very large checkpointing interval?

Thanks for your insights.

Mathieu


Remove stackTrace from error response

2021-12-27 Thread Koffman, Noa (Nokia - IL/Kfar Sava)
Hi, as part of security improvement we need to make sure APIs do not return 
stack traces as part of the error response,
The JobManager REST API does currently return the stack trace on error,
For example:
❯ curl http://localhost:8081/jobs/123
{"errors":["org.apache.flink.runtime.rest.handler.RestHandlerException: Bad 
request, could not parse parameters: Cannot resolve path parameter (jobid) from 
value \"123\".\n\tat 
org.apache.flink.runtime.rest.handler.AbstractHandler.respondAsLeader(AbstractHandler.java:187)\n\tat
 
org.apache.flink.runtime.rest.handler.LeaderRetrievalHandler.lambda$channelRead0$0(LeaderRetrievalHandler.java:83)\n\tat
 java.base/java.util.Optional.ifPresent(Unknown Source)\n\tat 
org.apache.flink.util.OptionalConsumer.ifPresent(OptionalConsumer.java:45)\n\tat
 
org.apache.flink.runtime.rest.handler.LeaderRetrievalHandler.channelRead0(LeaderRetrievalHandler.java:80)\n\tat
 
org.apache.flink.runtime.rest.handler.LeaderRetrievalHandler.channelRead0(LeaderRetrievalHandler.java:49)\n\tat
 
org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99)\n\tat
 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)\n\tat
 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)\n\tat
 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)\n\tat
 
org.apache.flink.runtime.rest.handler.router.RouterHandler.routed(RouterHandler.java:115)\n\tat
 
org.apache.flink.runtime.rest.handler.router.RouterHandler.channelRead0(RouterHandler.java:94)\n\tat
 
org.apache.flink.runtime.rest.handler.router.RouterHandler.channelRead0(RouterHandler.java:55)\n\tat
 
org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99)\n\tat
 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)\n\tat
 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)\n\tat
 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)\n\tat
 
org.apache.flink.shaded.netty4.io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)\n\tat
 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)\n\tat
 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)\n\tat
 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)\n\tat
 
org.apache.flink.runtime.rest.FileUploadHandler.channelRead0(FileUploadHandler.java:208)\n\tat
 
org.apache.flink.runtime.rest.FileUploadHandler.channelRead0(FileUploadHandler.java:69)\n\tat
 
org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99)\n\tat
 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)\n\tat
 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)\n\tat
 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)\n\tat
 
org.apache.flink.shaded.netty4.io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:436)\n\tat
 
org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:324)\n\tat
 
org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:296)\n\tat
 
org.apache.flink.shaded.netty4.io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:251)\n\tat
 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)\n\tat
 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)\n\tat
 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)\n\tat
 
org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)\n\tat
 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(A

Re: Avoiding Dynamic Classloading for User Code

2021-12-27 Thread Lior Liviev
And everything happens at launch

From: Lior Liviev 
Sent: Sunday, December 26, 2021 7:38 PM
To: David Morávek 
Cc: user 
Subject: Re: Avoiding Dynamic Classloading for User Code


CAUTION: external source

Hey David,
I placed the jar in flink folder but now I see some weird exceptions that 
weren't before:

  1.  In flink logs I see non stop Failed to access job archive location for 
path hdfs:/completed-jobs. java.io.FileNotFoundException: File 
hdfs:/completed-jobs does not exist. And after couple of minutes it recovered 
and was able to read the file.
  2.  At launch I see: No jobs included in application. And as the first case 
it recovers and was able to find the jobs.

Do you have any idea what can be the problem?

From: David Morávek 
Sent: Thursday, December 23, 2021 2:06 PM
To: Lior Liviev 
Cc: user 
Subject: Re: Avoiding Dynamic Classloading for User Code


CAUTION: external source

Then I don't really know what else to suggest in this direction. This approach 
should work in general, if you have control over the class path and you make 
sure all the dependencies play well with each other, as the parent-first 
classloading doesn't provide you with any crutches as the child-first one.

As mentioned previously, using a CLI instead of REST API to submit the job 
should avoid this issue whatsoever, as the job driver is executed in a separate 
JVM. There is some future work (also mentioned above) to address the known 
issues with the web-submission, but I'm afraid there will always be some edge 
cases, that we won't be able to handle, because we have no control of the user 
code.

D.

On Thu, Dec 23, 2021 at 12:54 PM Lior Liviev 
mailto:lior.liv...@earnix.com>> wrote:
We use hadoop in EMR 6.4 (if I'm not mistaken, emr has it's own version of 
hadoop so we don't define it) and we use flink 13.1

From: David Morávek mailto:d...@apache.org>>
Sent: Thursday, December 23, 2021 1:44 PM
To: Lior Liviev mailto:lior.liv...@earnix.com>>
Cc: user mailto:user@flink.apache.org>>
Subject: Re: Avoiding Dynamic Classloading for User Code


CAUTION: external source

Please try to post the whole stacktrace the next time, but in general it sounds 
like you've conflicting avro versions on the classpath (actually I'd expect a 
version < 1.8 [1] in your user jar / maybe loaded from hadoop?). What Flink 
version are you using? Are you using Flink with Hadoop (which version / how you 
link the hadoop deps)?

[1] 
https://issues.apache.org/jira/browse/AVRO-1497

On Thu, Dec 23, 2021 at 12:20 PM Lior Liviev 
mailto:lior.liv...@earnix.com>> wrote:
I get this: Caused by: java.lang.NoSuchMethodError: 
org.apache.avro.Schema.getLogicalType()Lorg/apache/avro/LogicalType;
And I'm using avro 1.10

From: David Morávek mailto:d...@apache.org>>
Sent: Thursday, December 23, 2021 12:37 PM
To: Lior Liviev mailto:lior.liv...@earnix.com>>; user 
mailto:user@flink.apache.org>>
Subject: Re: Avoiding Dynamic Classloading for User Code


CAUTION: external source

I guess I'd need more context to answer that. Have you checked the JM logs for 
more details?

On Thu, Dec 23, 2021 at 9:01 AM Lior Liviev 
mailto:lior.liv...@earnix.com>> wrote:
Is there any reason I'm getting "Could not execute application" after I put the 
Jar in /lib?

From: David Morávek mailto:d...@apache.org>>
Sent: Wednesday, December 22, 2021 2:04 PM
To: Lior Liviev mailto:lior.liv...@earnix.com>>
Cc: user mailto:user@flink.apache.org>>
Subject: Re: Avoiding Dynamic Classloading for User Code


CAUTION: external source

1. Yes, I'm not aware of a way to avoid it right now when you're submitting job 
via REST API.
2. Hopefully not, the classes should be always loaded from the parent loader if 
they can be found on classpath ... but as I've told you before, this is a hacky 
solution which is healing symptoms instead of addressing the cause and 
definitely not a recommended way to submit jobs

D.

On Wed, Dec 22, 2021 at 12:58 PM Lior Liviev 
mailto:lior.liv...@earnix.com>> wrote:
Hello David,

I have some questions regarding our conversation:

  1.  When I put the JAR in $FLINK/lib, do I need to use your REST API to load 
it?
  2.  If I have my JAR in the folder AND I load same JAR via REST API, will I 
run into problems? (class loading strategy is set to parent-first)


From: David Morávek mailto:d...@apache.org>>
Sent: Tuesday, December 21, 2021 6:53 PM
To: Lior Liviev mailto:lior