Re: Reading from sockets using dataset api

2020-04-24 Thread Arvid Heise
Hi Kaan,

sorry, I haven't considered I/O as the bottleneck. I thought a bit more
about your issue and came to a rather simple solution.

How about you open a socket on each of your generator nodes? Then you
configure your analysis job to read from each of these sockets with a
separate source and union them before feeding them to the actual job?

You don't need to modify much on the analysis job and each source can be
independently read. WDYT?

On Fri, Apr 24, 2020 at 8:46 AM Kaan Sancak  wrote:

> Thanks for the answer! Also thanks for raising some concerns about my
> question.
>
> Some of the graphs I have been using is larger than 1.5 tb, and I am
> currently an experiment stage of a project, and I am making modifications
> to my code and re-runing the experiments again. Currently, on some of the
> largest graphs I have been using, IO became an issue for me and keeps me
> wait for couple of hours.
>
> Moreover, I have a parallel/distributed graph generator, which I can run
> on the same set of nodes in my cluster. So what I wanted to do was, to run
> my Flink program and graph generator at the same time and feed the graph
> through generator, which should be faster than making IO from the disk. As
> you said, it is not essential for me to that, but I am trying to see what I
> am able to do using Flink and how can I solve such problems. I was also
> using another framework, and faced with the similar problem, I was able to
> reduce the graph read time from hours to minutes using this method.
>
>  Do you really have more main memory than disk space?
>
>
> My issue is actually not storage related, I am trying to see how can I
> reduce the IO time.
>
> One trick came to my mind is, creating dummy dataset, and using a map
> function on the dataset, I can open-up bunch of sockets and listen the
> generator, and collect the generated data. I am trying to see how it will
> turn out.
>
> Alternatively, if graph generation is rather cheap, you could also try to
> incorporate it directly into the analysis job.
>
>
> I am not familiar with the analysis jobs. I will look into it.
>
> Again, this is actually not a problem, I am just trying to experiment with
> the framework and see what I can do. I am very new to Flink, so my methods
> might be wrong. Thanks for the help!
>
> Best
> Kaan
>
>
> On Apr 23, 2020, at 10:51 AM, Arvid Heise  wrote:
>
> Hi Kaan,
>
> afaik there is no (easy) way to switch from streaming back to batch API
> while retaining all data in memory (correct me if I misunderstood).
>
> However, from your description, I also have some severe understanding
> problems. Why can't you dump the data to some file? Do you really have more
> main memory than disk space? Or do you have no shared memory between your
> generating cluster and the flink cluster?
>
> It almost sounds as if the issue at heart is rather to find a good
> serialization format on how to store the edges. The 70 billion edges could
> be stored in an array of id pairs, which amount to ~560 GB uncompressed
> data if stored in Avro (or any other binary serialization format) when ids
> are longs. That's not much by today's standards and could also be easily
> offloaded to S3.
>
> Alternatively, if graph generation is rather cheap, you could also try to
> incorporate it directly into the analysis job.
>
> On Wed, Apr 22, 2020 at 2:58 AM Kaan Sancak  wrote:
>
>> Hi,
>>
>> I have been running some experiments on  large graph data, smallest graph
>> I have been using is around ~70 billion edges. I have a graph generator,
>> which generates the graph in parallel and feeds to the running system.
>> However, it takes a lot of time to read the edges, because even though the
>> graph generation process is parallel, in Flink I can only listen from
>> master node (correct me if I am wrong). Another option is dumping the
>> generated data to a file and reading with readFromCsv, however this is not
>> feasible in terms of storage management.
>>
>> What I want to do is, invoking my graph generator, using ipc/tcp
>> protocols  and reading the generated data from the sockets. Since the graph
>> data is also generated parallel in each node, I want to make use of ipc,
>> and read the data in parallel at each node. I made some online digging  but
>> couldn’t find something similar using dataset api. I would be glad if you
>> have some similar use cases or examples.
>>
>> Is it possible to use streaming environment to create the data in
>> parallel and switch to dataset api?
>>
>> Thanks in advance!
>>
>> Best
>> Kaan
>
>
>
> --
> Arvid Heise | Senior Java Developer
> 
>
> Follow us @VervericaData
> --
> Join Flink Forward  - The Apache Flink
> Conference
> Stream Processing | Event Driven | Real Time
> --
> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
> --
> Ververica GmbH
> Registered at Amtsgericht Charlottenburg: HRB 158244 B
> Managing Directors: Timothy Alexander Steinert, Yip Park Tun

Re: JDBC table api questions

2020-04-24 Thread Flavio Pompermaier
Ok, but how can view then retrieved via table API? If they are returned as
Table objects, is there a way to mark them read only?
Because VIEWS in Flink SQL are Flink Views, so how can I query JDBC views?

On Fri, Apr 24, 2020 at 4:22 AM Zhenghua Gao  wrote:

> FLINK-16471 introduce a JDBCCatalog, which implements Catalog interface.
> Currently we only support PostgresCatalog and listTables().
> If you want to get the list of views, you can implement listViews()
> (currently return an empty list).
>
> *Best Regards,*
> *Zhenghua Gao*
>
>
> On Thu, Apr 23, 2020 at 8:48 PM Flavio Pompermaier 
> wrote:
>
>> Hi all,
>> is there a way to get the list of existing views in a JDBC database?
>> Is this something that could be supported somehow?
>>
>> Moreover, it would be interesting for us to also know the original field
>> type of a table..is there a way to get it (without implementing a dedicated
>> API)? Do you think it makes sense to expose it in the Table API?
>>
>> Best,
>> Flavio
>>
>


Re: Flink

2020-04-24 Thread Konstantin Knauf
Hi Navneeth,

I think there might be some misunderstanding here. Let me try to clarify.

1) The so-called native Kubernetes support [1], which was added as an
experimental feature in Flink 1.10, is not used by Ververica Platform CE
nor by the Lyft K8s Operator as far as I am aware of.

2) The native Kubernetes support only supports session clusters [1] so far.
Support for Job clusters work in progress for Flink 1.11. In case of a
session cluster, Flink will request additional resources from Kubernetes
whenever necessary to deploy a Flink Job upon submission. So, there is
automatic scaling of the *cluster, *but no automatic scaling of the *jobs*.

3) There are multiple efforts within the Flink ecosystem working on auto
scaling jobs.

I hope this helps and please reach out if you have further questions.

Best,

Konstantin

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/deployment/native_kubernetes.html
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.10/concepts/glossary.html#flink-session-cluster


On Thu, Apr 23, 2020 at 7:11 AM Navneeth Krishnan 
wrote:

> Thanks a lot Timo. I will take a look at it. But does flink automatically
> scale up and down at this point with native integration?
>
> Thanks
>
> On Tue, Apr 14, 2020 at 11:27 PM Timo Walther  wrote:
>
>> Hi Navneeth,
>>
>> it might be also worth to look into Ververica Plaform for this. The
>> community edition was published recently is free of charge. It provides
>> first class K8s support [1].
>>
>> There is also a tutorial how to deploy it on EKS [2] (not the most
>> recent one through).
>>
>> Regards,
>> Timo
>>
>> [1]
>>
>> https://www.ververica.com/blog/announcing-ververica-platform-community-edition?utm_campaign=Ververica%20Platform%20-%20Community%20Edition&utm_content=123140986&utm_medium=social&utm_source=twitter&hss_channel=tw-2581958070
>> [2]
>>
>> https://www.ververica.com/blog/how-to-get-started-with-data-artisans-platform-on-aws-eks
>>
>>
>>
>> On 15.04.20 03:57, Navneeth Krishnan wrote:
>> > Hi All,
>> >
>> > I'm very new to EKS and trying to deploy a flink job in cluster mode.
>> > Are there any good documentations on what are the steps to deploy on
>> EKS?
>> >
>> >  From my understanding, with flink 1.10 running it on EKS will
>> > automatically scale up and down with kubernetes integration based on
>> the
>> > load. Is this correct? Do I have to do enable some configs to support
>> > this feature?
>> >
>> > How to use the lyft k8s operator when deploying on EKS?
>> >
>> > Thanks a lot, appreciate all the help.
>> >
>> >
>> >
>> >
>>
>>

-- 

Konstantin Knauf | Head of Product

+49 160 91394525


Follow us @VervericaData Ververica 


--

Join Flink Forward  - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
(Tony) Cheng


Re: Flink 1.10 Out of memory

2020-04-24 Thread Stephan Ewen
@Xintong - out of curiosity, where do you see that this tries to fork a
process? I must be overlooking something, I could only see the native
method call.

On Fri, Apr 24, 2020 at 4:53 AM Xintong Song  wrote:

> @Stephan,
> I don't think so. If JVM hits the direct memory limit, you should see the
> error message "OutOfMemoryError: Direct buffer memory".
>
> Thank you~
>
> Xintong Song
>
>
>
> On Thu, Apr 23, 2020 at 6:11 PM Stephan Ewen  wrote:
>
>> @Xintong and @Lasse could it be that the JVM hits the "Direct Memory"
>> limit here?
>> Would increasing the "taskmanager.memory.framework.off-heap.size" help?
>>
>> On Mon, Apr 20, 2020 at 11:02 AM Zahid Rahman 
>> wrote:
>>
>>> As you can see from the task manager tab of flink web dashboard
>>>
>>> Physical Memory:3.80 GB
>>> JVM Heap Size:1.78 GB
>>> Flink Managed Memory:128 MB
>>>
>>> *Flink is only using 128M MB which can easily cause OOM*
>>> *error.*
>>>
>>> *These are DEFAULT settings.*
>>>
>>> *I dusted off an old laptop so it only 3.8 GB RAM.*
>>>
>>> What does your job metrics say  ?
>>>
>>> On Mon, 20 Apr 2020, 07:26 Xintong Song,  wrote:
>>>
 Hi Lasse,

 From what I understand, your problem is that JVM tries to fork some
 native process (if you look at the exception stack the root exception is
 thrown from a native method) but there's no enough memory for doing that.
 This could happen when either Mesos is using cgroup strict mode for memory
 control, or there's no more memory on the machine. Flink cannot prevent
 native processes from using more memory. It can only reserve certain amount
 of memory for such native usage when requesting worker memory from the
 deployment environment (in your case Mesos) and allocating Java heap /
 direct memory.

 My suggestion is to try increasing the JVM overhead configuration. You
 can leverage the configuration options
 'taskmanager.memory.jvm-overhead.[min|max|fraction]'. See more details in
 the documentation[1].

 Thank you~

 Xintong Song


 [1]
 https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/config.html#taskmanager-memory-jvm-overhead-max

 On Sat, Apr 18, 2020 at 4:02 AM Zahid Rahman 
 wrote:

> https://betsol.com/java-memory-management-for-java-virtual-machine-jvm/
>
> Backbutton.co.uk
> ¯\_(ツ)_/¯
> ♡۶Java♡۶RMI ♡۶
> Make Use Method {MUM}
> makeuse.org
> 
>
>
> On Fri, 17 Apr 2020 at 14:07, Lasse Nedergaard <
> lassenedergaardfl...@gmail.com> wrote:
>
>> Hi.
>>
>> We have migrated to Flink 1.10 and face out of memory exception and
>> hopeful can someone point us in the right direction.
>>
>> We have a job that use broadcast state, and we sometimes get out
>> memory when it creates a savepoint. See stacktrack below.
>> We have assigned 2.2 GB/task manager and
>> configured  taskmanager.memory.process.size : 2200m
>> In Flink 1.9 our container was terminated because OOM, so 1.10 do a
>> better job, but it still not working and the task manager is leaking mem
>> for each OOM and finial kill by Mesos
>>
>>
>> Any idea what we can do to figure out what settings we need to change?
>>
>> Thanks in advance
>>
>> Lasse Nedergaard
>>
>>
>> WARN o.a.flink.runtime.state.filesystem.FsCheckpointStreamFactory -
>> Could not close the state stream for
>> s3://flinkstate/dcos-prod/checkpoints/fc9318cc236d09f0bfd994f138896d6c/chk-3509/cf0714dc-ad7c-4946-b44c-96d4a131a4fa.
>> java.io.IOException: Cannot allocate memory at
>> java.io.FileOutputStream.writeBytes(Native Method) at
>> java.io.FileOutputStream.write(FileOutputStream.java:326) at
>> java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82) at
>> java.io.BufferedOutputStream.flush(BufferedOutputStream.java:140) at
>> java.io.FilterOutputStream.flush(FilterOutputStream.java:140) at
>> java.io.FilterOutputStream.close(FilterOutputStream.java:158) at
>> com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3OutputStream.close(PrestoS3FileSystem.java:995)
>> at
>> org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:72)
>> at
>> org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:101)
>> at
>> org.apache.flink.fs.s3presto.common.HadoopDataOutputStream.close(HadoopDataOutputStream.java:52)
>> at
>> org.apache.flink.core.fs.ClosingFSDataOutputStream.close(ClosingFSDataOutputStream.java:64)
>> at
>> org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.close(FsCheckpointStreamFactory.java:277)
>> at org.apache.flink.util.IOUtils.closeQuietly(IOUtils.java:263) at
>> org.apache.flink.util.IOUtils.closeAllQuietly(IOUtils.java:250) at
>> org.apache.flink.util.AbstractCloseableRegistry.cl

Re: Reading from sockets using dataset api

2020-04-24 Thread Kaan Sancak
Yes, that sounds like a great idea and actually that's what I am trying to do.

>  Then you configure your analysis job to read from each of these sockets with 
> a separate source and union them before feeding them to the actual job?

Before trying to open the sockets on the slave nodes, first I have opened just 
one socket at master node, and I also run the generator with one node as well. 
I was able to read the graph, and the run my algorithm without any problems. 
This was a test run to see whatever I can do it.

After, I have opened bunch of sockets on my generators, now I am trying to 
configure Flink to read from those sockets. However, I am having problems while 
trying to assign each task manager to a separate socket. I am assuming my 
problems are related to network binding. In my configuration file,  
jobmanager.rpc.address is set but I have not done similar configurations for 
slave nodes.

Am I on the right track, or is there an easier way to handle this?

I think my point is how to do `read from each of these sockets with a separate 
source` part.

Thanks again

Best
Kaan
 


> On Apr 24, 2020, at 3:11 AM, Arvid Heise  wrote:
> 
> Hi Kaan,
> 
> sorry, I haven't considered I/O as the bottleneck. I thought a bit more about 
> your issue and came to a rather simple solution.
> 
> How about you open a socket on each of your generator nodes? Then you 
> configure your analysis job to read from each of these sockets with a 
> separate source and union them before feeding them to the actual job?
> 
> You don't need to modify much on the analysis job and each source can be 
> independently read. WDYT?
> 
> On Fri, Apr 24, 2020 at 8:46 AM Kaan Sancak  > wrote:
> Thanks for the answer! Also thanks for raising some concerns about my 
> question.
> 
> Some of the graphs I have been using is larger than 1.5 tb, and I am 
> currently an experiment stage of a project, and I am making modifications to 
> my code and re-runing the experiments again. Currently, on some of the 
> largest graphs I have been using, IO became an issue for me and keeps me wait 
> for couple of hours.
> 
> Moreover, I have a parallel/distributed graph generator, which I can run on 
> the same set of nodes in my cluster. So what I wanted to do was, to run my 
> Flink program and graph generator at the same time and feed the graph through 
> generator, which should be faster than making IO from the disk. As you said, 
> it is not essential for me to that, but I am trying to see what I am able to 
> do using Flink and how can I solve such problems. I was also using another 
> framework, and faced with the similar problem, I was able to reduce the graph 
> read time from hours to minutes using this method.
> 
>>  Do you really have more main memory than disk space?
> 
> My issue is actually not storage related, I am trying to see how can I reduce 
> the IO time.  
> 
> One trick came to my mind is, creating dummy dataset, and using a map 
> function on the dataset, I can open-up bunch of sockets and listen the 
> generator, and collect the generated data. I am trying to see how it will 
> turn out.
> 
>> Alternatively, if graph generation is rather cheap, you could also try to 
>> incorporate it directly into the analysis job.
> 
> I am not familiar with the analysis jobs. I will look into it.
> 
> Again, this is actually not a problem, I am just trying to experiment with 
> the framework and see what I can do. I am very new to Flink, so my methods 
> might be wrong. Thanks for the help! 
> 
> Best
> Kaan
> 
> 
>> On Apr 23, 2020, at 10:51 AM, Arvid Heise > > wrote:
>> 
>> Hi Kaan,
>> 
>> afaik there is no (easy) way to switch from streaming back to batch API 
>> while retaining all data in memory (correct me if I misunderstood).
>> 
>> However, from your description, I also have some severe understanding 
>> problems. Why can't you dump the data to some file? Do you really have more 
>> main memory than disk space? Or do you have no shared memory between your 
>> generating cluster and the flink cluster?
>> 
>> It almost sounds as if the issue at heart is rather to find a good 
>> serialization format on how to store the edges. The 70 billion edges could 
>> be stored in an array of id pairs, which amount to ~560 GB uncompressed data 
>> if stored in Avro (or any other binary serialization format) when ids are 
>> longs. That's not much by today's standards and could also be easily 
>> offloaded to S3.
>> 
>> Alternatively, if graph generation is rather cheap, you could also try to 
>> incorporate it directly into the analysis job.
>> 
>> On Wed, Apr 22, 2020 at 2:58 AM Kaan Sancak > > wrote:
>> Hi,
>> 
>> I have been running some experiments on  large graph data, smallest graph I 
>> have been using is around ~70 billion edges. I have a graph generator, which 
>> generates the graph in parallel and feeds to the running system. However, it 
>> takes a lo

Re: Flink 1.10 Out of memory

2020-04-24 Thread Xintong Song
I might be wrong about how JNI works. Isn't a native method always executed
in another process?

I was searching for the java error message "Cannot allocate memory", and it
seems this happens when JVM cannot allocate memory from the OS. Given the
exception is thrown from calling a native method, I think the problem is
that not enough native memory can be allocated for executing the native
method.

Thank you~

Xintong Song



On Fri, Apr 24, 2020 at 3:40 PM Stephan Ewen  wrote:

> @Xintong - out of curiosity, where do you see that this tries to fork a
> process? I must be overlooking something, I could only see the native
> method call.
>
> On Fri, Apr 24, 2020 at 4:53 AM Xintong Song 
> wrote:
>
>> @Stephan,
>> I don't think so. If JVM hits the direct memory limit, you should see the
>> error message "OutOfMemoryError: Direct buffer memory".
>>
>> Thank you~
>>
>> Xintong Song
>>
>>
>>
>> On Thu, Apr 23, 2020 at 6:11 PM Stephan Ewen  wrote:
>>
>>> @Xintong and @Lasse could it be that the JVM hits the "Direct Memory"
>>> limit here?
>>> Would increasing the "taskmanager.memory.framework.off-heap.size" help?
>>>
>>> On Mon, Apr 20, 2020 at 11:02 AM Zahid Rahman 
>>> wrote:
>>>
 As you can see from the task manager tab of flink web dashboard

 Physical Memory:3.80 GB
 JVM Heap Size:1.78 GB
 Flink Managed Memory:128 MB

 *Flink is only using 128M MB which can easily cause OOM*
 *error.*

 *These are DEFAULT settings.*

 *I dusted off an old laptop so it only 3.8 GB RAM.*

 What does your job metrics say  ?

 On Mon, 20 Apr 2020, 07:26 Xintong Song,  wrote:

> Hi Lasse,
>
> From what I understand, your problem is that JVM tries to fork some
> native process (if you look at the exception stack the root exception is
> thrown from a native method) but there's no enough memory for doing that.
> This could happen when either Mesos is using cgroup strict mode for memory
> control, or there's no more memory on the machine. Flink cannot prevent
> native processes from using more memory. It can only reserve certain 
> amount
> of memory for such native usage when requesting worker memory from the
> deployment environment (in your case Mesos) and allocating Java heap /
> direct memory.
>
> My suggestion is to try increasing the JVM overhead configuration. You
> can leverage the configuration options
> 'taskmanager.memory.jvm-overhead.[min|max|fraction]'. See more details in
> the documentation[1].
>
> Thank you~
>
> Xintong Song
>
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/config.html#taskmanager-memory-jvm-overhead-max
>
> On Sat, Apr 18, 2020 at 4:02 AM Zahid Rahman 
> wrote:
>
>>
>> https://betsol.com/java-memory-management-for-java-virtual-machine-jvm/
>>
>> Backbutton.co.uk
>> ¯\_(ツ)_/¯
>> ♡۶Java♡۶RMI ♡۶
>> Make Use Method {MUM}
>> makeuse.org
>> 
>>
>>
>> On Fri, 17 Apr 2020 at 14:07, Lasse Nedergaard <
>> lassenedergaardfl...@gmail.com> wrote:
>>
>>> Hi.
>>>
>>> We have migrated to Flink 1.10 and face out of memory exception and
>>> hopeful can someone point us in the right direction.
>>>
>>> We have a job that use broadcast state, and we sometimes get out
>>> memory when it creates a savepoint. See stacktrack below.
>>> We have assigned 2.2 GB/task manager and
>>> configured  taskmanager.memory.process.size : 2200m
>>> In Flink 1.9 our container was terminated because OOM, so 1.10 do a
>>> better job, but it still not working and the task manager is leaking mem
>>> for each OOM and finial kill by Mesos
>>>
>>>
>>> Any idea what we can do to figure out what settings we need to
>>> change?
>>>
>>> Thanks in advance
>>>
>>> Lasse Nedergaard
>>>
>>>
>>> WARN o.a.flink.runtime.state.filesystem.FsCheckpointStreamFactory -
>>> Could not close the state stream for
>>> s3://flinkstate/dcos-prod/checkpoints/fc9318cc236d09f0bfd994f138896d6c/chk-3509/cf0714dc-ad7c-4946-b44c-96d4a131a4fa.
>>> java.io.IOException: Cannot allocate memory at
>>> java.io.FileOutputStream.writeBytes(Native Method) at
>>> java.io.FileOutputStream.write(FileOutputStream.java:326) at
>>> java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82) 
>>> at
>>> java.io.BufferedOutputStream.flush(BufferedOutputStream.java:140) at
>>> java.io.FilterOutputStream.flush(FilterOutputStream.java:140) at
>>> java.io.FilterOutputStream.close(FilterOutputStream.java:158) at
>>> com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3OutputStream.close(PrestoS3FileSystem.java:995)
>>> at
>>> org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:72)
>>> at
>>> org.apache.hadoop.fs.

Re: Reading from sockets using dataset api

2020-04-24 Thread Arvid Heise
Hm, I confused sockets to work the other way around (so pulling like
URLInputStream instead of listening). I'd go by providing the data on a
port on each generator node. And then read from that in multiple sources.

I think the best solution is to implement a custom InputFormat and then use
readInput. You could implement a subclass of GenericInputFormat. You might
even use IteratorInputFormat like this:

private static class URLInputIterator implements Iterator>, Serializable {
   private final URL url;
   private Iterator> inner;

   private URLInputIterator(URL url) {
  this.url = url;
   }

   private void readObject(ObjectInputStream in) throws IOException,
ClassNotFoundException {
  InputStream inputStream = url.openStream();
  inner = new BufferedReader(new InputStreamReader(inputStream,
StandardCharsets.UTF_8))
 .lines()
 .map(line -> {
String[] parts = line.split(";");
return new Tuple2<>(Long.parseLong(parts[0]),
Long.parseLong(parts[1]));
 })
 .iterator();
   }

   @Override
   public boolean hasNext() {
  return inner.hasNext();
   }

   @Override
   public Tuple2 next() {
  return inner.next();
   }
}

env.fromCollection(new URLInputIterator(new URL("gen_node1", )),
Types.TUPLE(Types.LONG, Types.LONG));




On Fri, Apr 24, 2020 at 9:42 AM Kaan Sancak  wrote:

> Yes, that sounds like a great idea and actually that's what I am trying to
> do.
>
>  Then you configure your analysis job to read from each of these sockets
> with a separate source and union them before feeding them to the actual job?
>
>
> Before trying to open the sockets on the slave nodes, first I have opened
> just one socket at master node, and I also run the generator with one node
> as well. I was able to read the graph, and the run my algorithm without any
> problems. This was a test run to see whatever I can do it.
>
> After, I have opened bunch of sockets on my generators, now I am trying to
> configure Flink to read from those sockets. However, I am having problems
> while trying to assign each task manager to a separate socket. I am
> assuming my problems are related to network binding. In my configuration
> file,  jobmanager.rpc.address is set but I have not done
> similar configurations for slave nodes.
>
> Am I on the right track, or is there an easier way to handle this?
>
> I think my point is how to do `read from each of these sockets with a
> separate source` part.
>
> Thanks again
>
> Best
> Kaan
>
>
>
> On Apr 24, 2020, at 3:11 AM, Arvid Heise  wrote:
>
> Hi Kaan,
>
> sorry, I haven't considered I/O as the bottleneck. I thought a bit more
> about your issue and came to a rather simple solution.
>
> How about you open a socket on each of your generator nodes? Then you
> configure your analysis job to read from each of these sockets with a
> separate source and union them before feeding them to the actual job?
>
> You don't need to modify much on the analysis job and each source can be
> independently read. WDYT?
>
> On Fri, Apr 24, 2020 at 8:46 AM Kaan Sancak  wrote:
>
>> Thanks for the answer! Also thanks for raising some concerns about my
>> question.
>>
>> Some of the graphs I have been using is larger than 1.5 tb, and I am
>> currently an experiment stage of a project, and I am making modifications
>> to my code and re-runing the experiments again. Currently, on some of the
>> largest graphs I have been using, IO became an issue for me and keeps me
>> wait for couple of hours.
>>
>> Moreover, I have a parallel/distributed graph generator, which I can run
>> on the same set of nodes in my cluster. So what I wanted to do was, to run
>> my Flink program and graph generator at the same time and feed the graph
>> through generator, which should be faster than making IO from the disk. As
>> you said, it is not essential for me to that, but I am trying to see what I
>> am able to do using Flink and how can I solve such problems. I was also
>> using another framework, and faced with the similar problem, I was able to
>> reduce the graph read time from hours to minutes using this method.
>>
>>  Do you really have more main memory than disk space?
>>
>>
>> My issue is actually not storage related, I am trying to see how can I
>> reduce the IO time.
>>
>> One trick came to my mind is, creating dummy dataset, and using a map
>> function on the dataset, I can open-up bunch of sockets and listen the
>> generator, and collect the generated data. I am trying to see how it will
>> turn out.
>>
>> Alternatively, if graph generation is rather cheap, you could also try to
>> incorporate it directly into the analysis job.
>>
>>
>> I am not familiar with the analysis jobs. I will look into it.
>>
>> Again, this is actually not a problem, I am just trying to experiment
>> with the framework and see what I can do. I am very new to Flink, so my
>> methods might be wrong. Thanks for the help!
>>
>> Best
>> Kaan
>>
>>
>> On Apr 23, 2020, at 10:51 AM, Arvid He

Re: Flink 1.10 Out of memory

2020-04-24 Thread Stephan Ewen
I think native methods are not in a forked process. It is just a malloc()
call that failed, probably an I/O buffer or so.
This might mean that there really is no native memory available any more,
meaning the process has hit its limit. In any case, a bit more JVM overhead
should solve this.

On Fri, Apr 24, 2020 at 10:24 AM Xintong Song  wrote:

> I might be wrong about how JNI works. Isn't a native method always
> executed in another process?
>
> I was searching for the java error message "Cannot allocate memory", and
> it seems this happens when JVM cannot allocate memory from the OS. Given
> the exception is thrown from calling a native method, I think the problem
> is that not enough native memory can be allocated for executing the native
> method.
>
> Thank you~
>
> Xintong Song
>
>
>
> On Fri, Apr 24, 2020 at 3:40 PM Stephan Ewen  wrote:
>
>> @Xintong - out of curiosity, where do you see that this tries to fork a
>> process? I must be overlooking something, I could only see the native
>> method call.
>>
>> On Fri, Apr 24, 2020 at 4:53 AM Xintong Song 
>> wrote:
>>
>>> @Stephan,
>>> I don't think so. If JVM hits the direct memory limit, you should see
>>> the error message "OutOfMemoryError: Direct buffer memory".
>>>
>>> Thank you~
>>>
>>> Xintong Song
>>>
>>>
>>>
>>> On Thu, Apr 23, 2020 at 6:11 PM Stephan Ewen  wrote:
>>>
 @Xintong and @Lasse could it be that the JVM hits the "Direct Memory"
 limit here?
 Would increasing the "taskmanager.memory.framework.off-heap.size" help?

 On Mon, Apr 20, 2020 at 11:02 AM Zahid Rahman 
 wrote:

> As you can see from the task manager tab of flink web dashboard
>
> Physical Memory:3.80 GB
> JVM Heap Size:1.78 GB
> Flink Managed Memory:128 MB
>
> *Flink is only using 128M MB which can easily cause OOM*
> *error.*
>
> *These are DEFAULT settings.*
>
> *I dusted off an old laptop so it only 3.8 GB RAM.*
>
> What does your job metrics say  ?
>
> On Mon, 20 Apr 2020, 07:26 Xintong Song, 
> wrote:
>
>> Hi Lasse,
>>
>> From what I understand, your problem is that JVM tries to fork some
>> native process (if you look at the exception stack the root exception is
>> thrown from a native method) but there's no enough memory for doing that.
>> This could happen when either Mesos is using cgroup strict mode for 
>> memory
>> control, or there's no more memory on the machine. Flink cannot prevent
>> native processes from using more memory. It can only reserve certain 
>> amount
>> of memory for such native usage when requesting worker memory from the
>> deployment environment (in your case Mesos) and allocating Java heap /
>> direct memory.
>>
>> My suggestion is to try increasing the JVM overhead configuration.
>> You can leverage the configuration options
>> 'taskmanager.memory.jvm-overhead.[min|max|fraction]'. See more details in
>> the documentation[1].
>>
>> Thank you~
>>
>> Xintong Song
>>
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/config.html#taskmanager-memory-jvm-overhead-max
>>
>> On Sat, Apr 18, 2020 at 4:02 AM Zahid Rahman 
>> wrote:
>>
>>>
>>> https://betsol.com/java-memory-management-for-java-virtual-machine-jvm/
>>>
>>> Backbutton.co.uk
>>> ¯\_(ツ)_/¯
>>> ♡۶Java♡۶RMI ♡۶
>>> Make Use Method {MUM}
>>> makeuse.org
>>> 
>>>
>>>
>>> On Fri, 17 Apr 2020 at 14:07, Lasse Nedergaard <
>>> lassenedergaardfl...@gmail.com> wrote:
>>>
 Hi.

 We have migrated to Flink 1.10 and face out of memory exception and
 hopeful can someone point us in the right direction.

 We have a job that use broadcast state, and we sometimes get out
 memory when it creates a savepoint. See stacktrack below.
 We have assigned 2.2 GB/task manager and
 configured  taskmanager.memory.process.size : 2200m
 In Flink 1.9 our container was terminated because OOM, so 1.10 do a
 better job, but it still not working and the task manager is leaking 
 mem
 for each OOM and finial kill by Mesos


 Any idea what we can do to figure out what settings we need to
 change?

 Thanks in advance

 Lasse Nedergaard


 WARN o.a.flink.runtime.state.filesystem.FsCheckpointStreamFactory -
 Could not close the state stream for
 s3://flinkstate/dcos-prod/checkpoints/fc9318cc236d09f0bfd994f138896d6c/chk-3509/cf0714dc-ad7c-4946-b44c-96d4a131a4fa.
 java.io.IOException: Cannot allocate memory at
 java.io.FileOutputStream.writeBytes(Native Method) at
 java.io.FileOutputStream.write(FileOutputStream.java:326) at
 java.io.BufferedOutputStream.flushBuffer(BufferedOu

Re: Flink 1.10 Out of memory

2020-04-24 Thread Xintong Song
True. Thanks for the clarification.

Thank you~

Xintong Song



On Fri, Apr 24, 2020 at 5:21 PM Stephan Ewen  wrote:

> I think native methods are not in a forked process. It is just a malloc()
> call that failed, probably an I/O buffer or so.
> This might mean that there really is no native memory available any more,
> meaning the process has hit its limit. In any case, a bit more JVM overhead
> should solve this.
>
> On Fri, Apr 24, 2020 at 10:24 AM Xintong Song 
> wrote:
>
>> I might be wrong about how JNI works. Isn't a native method always
>> executed in another process?
>>
>> I was searching for the java error message "Cannot allocate memory", and
>> it seems this happens when JVM cannot allocate memory from the OS. Given
>> the exception is thrown from calling a native method, I think the problem
>> is that not enough native memory can be allocated for executing the native
>> method.
>>
>> Thank you~
>>
>> Xintong Song
>>
>>
>>
>> On Fri, Apr 24, 2020 at 3:40 PM Stephan Ewen  wrote:
>>
>>> @Xintong - out of curiosity, where do you see that this tries to fork a
>>> process? I must be overlooking something, I could only see the native
>>> method call.
>>>
>>> On Fri, Apr 24, 2020 at 4:53 AM Xintong Song 
>>> wrote:
>>>
 @Stephan,
 I don't think so. If JVM hits the direct memory limit, you should see
 the error message "OutOfMemoryError: Direct buffer memory".

 Thank you~

 Xintong Song



 On Thu, Apr 23, 2020 at 6:11 PM Stephan Ewen  wrote:

> @Xintong and @Lasse could it be that the JVM hits the "Direct Memory"
> limit here?
> Would increasing the "taskmanager.memory.framework.off-heap.size" help?
>
> On Mon, Apr 20, 2020 at 11:02 AM Zahid Rahman 
> wrote:
>
>> As you can see from the task manager tab of flink web dashboard
>>
>> Physical Memory:3.80 GB
>> JVM Heap Size:1.78 GB
>> Flink Managed Memory:128 MB
>>
>> *Flink is only using 128M MB which can easily cause OOM*
>> *error.*
>>
>> *These are DEFAULT settings.*
>>
>> *I dusted off an old laptop so it only 3.8 GB RAM.*
>>
>> What does your job metrics say  ?
>>
>> On Mon, 20 Apr 2020, 07:26 Xintong Song, 
>> wrote:
>>
>>> Hi Lasse,
>>>
>>> From what I understand, your problem is that JVM tries to fork some
>>> native process (if you look at the exception stack the root exception is
>>> thrown from a native method) but there's no enough memory for doing 
>>> that.
>>> This could happen when either Mesos is using cgroup strict mode for 
>>> memory
>>> control, or there's no more memory on the machine. Flink cannot prevent
>>> native processes from using more memory. It can only reserve certain 
>>> amount
>>> of memory for such native usage when requesting worker memory from the
>>> deployment environment (in your case Mesos) and allocating Java heap /
>>> direct memory.
>>>
>>> My suggestion is to try increasing the JVM overhead configuration.
>>> You can leverage the configuration options
>>> 'taskmanager.memory.jvm-overhead.[min|max|fraction]'. See more details 
>>> in
>>> the documentation[1].
>>>
>>> Thank you~
>>>
>>> Xintong Song
>>>
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/config.html#taskmanager-memory-jvm-overhead-max
>>>
>>> On Sat, Apr 18, 2020 at 4:02 AM Zahid Rahman 
>>> wrote:
>>>

 https://betsol.com/java-memory-management-for-java-virtual-machine-jvm/

 Backbutton.co.uk
 ¯\_(ツ)_/¯
 ♡۶Java♡۶RMI ♡۶
 Make Use Method {MUM}
 makeuse.org
 


 On Fri, 17 Apr 2020 at 14:07, Lasse Nedergaard <
 lassenedergaardfl...@gmail.com> wrote:

> Hi.
>
> We have migrated to Flink 1.10 and face out of memory exception
> and hopeful can someone point us in the right direction.
>
> We have a job that use broadcast state, and we sometimes get out
> memory when it creates a savepoint. See stacktrack below.
> We have assigned 2.2 GB/task manager and
> configured  taskmanager.memory.process.size : 2200m
> In Flink 1.9 our container was terminated because OOM, so 1.10 do
> a better job, but it still not working and the task manager is 
> leaking mem
> for each OOM and finial kill by Mesos
>
>
> Any idea what we can do to figure out what settings we need to
> change?
>
> Thanks in advance
>
> Lasse Nedergaard
>
>
> WARN o.a.flink.runtime.state.filesystem.FsCheckpointStreamFactory
> - Could not close the state stream for
> s3://flinkstate/dcos-prod/checkpoints/fc9318cc236d09f0bfd994f138896d6c/chk-3509/cf0

Re: Handling stale data enrichment

2020-04-24 Thread Vinay Patil
Hi Konstantin,

Thank you for your answer.

Yes, we have timestamps in the subscription stream

>the disadvantage that you do not make any progress until you see fresh
subscription data. Is this the desired behavior for your use case?
No, this is not acceptable. Reason being the subscription data might be a
slow changing. Let's say it is not getting updated for 6 hrs. In this case
the click stream event is continuously flowing, we want to enrich it
against the slow moving stream.

In case of event time join/low level joins, I am assuming that the
watermarks will still make progress because of
https://ci.apache.org/projects/flink/flink-docs-stable/dev/event_time.html#idling-sources.
Or do we still have to handle it in the assigner and emit a watermark if we
are not receiving elements for a while ? (not sure how this will work in
case of low level joins)

I am considering to use low level joins approach using connected streams
where-in I will keep the reference data in state (processElement1) and
click stream event (processElement2) and join this. In this case I will
buffer the elements of click stream events for a configurable period of
time and then delete it. (This is to handle late record).

I think the downstream consumer of enriched data will have to dedup the
duplicate records or else we will end up having stale enrichment.

Regards,
Vinay Patil


On Fri, Apr 24, 2020 at 12:14 PM Konstantin Knauf  wrote:

> Hi Vinay,
>
> I assume your subscription updates also have a timestamp and a watermark.
> Otherwise, there is no way for Flink to tell that the subscription updates
> are late.
>
> If you use a "temporal table "-style join to join the two streams, and you
> do not receive any subscription updates for 2 hours, the watermark will not
> advance (it is the minimum of the two input streams) and hence all click
> events will be buffered. No output. This has the advantage of not sending
> out duplicate records, but the disadvantage that you do not make any
> progress until you see fresh subscription data. Is this the desired
> behavior for your use case?
>
> Best,
>
> Konstantin
>
>
> On Thu, Apr 23, 2020 at 1:29 PM Vinay Patil 
> wrote:
>
>> Hi,
>>
>> I went through Konstantin webinar on 99 ways you can do enrichment. One
>> thing I am failing to understand is how do we efficiently handle stale data
>> enrichment.
>>
>> Context: Let's say I want to enrich user data with the subscription data.
>> Here subscription data is acting as reference data and will be used for
>> joining these two streams based on event time. Consider the following
>> scenario:
>>
>>
>>1. We are going to enrich Click Stream event (containing user_info)
>>with Subscription details
>>2. Subscription Status for Alice user is FREE
>>3. Current Internal State contains Alice with Subscription status as
>>FREE
>>4.
>>
>>Reference data is not flowing because of some issue for 2hrs
>>5.
>>
>>Alice upgraded the subscription to Premium at 10.30 AM
>>6.
>>
>>Watched video event comes for Alice at 10.40 AM
>>-
>>
>>   flink pipeline looks up in internal state and writes to enrichment
>>   topic
>>   -
>>
>>   Enrichment topic now contains Alice -> FREE
>>   7.
>>
>>Reference data starts flowing in at 11AM
>>-
>>
>>   let's assume we consider late elements upto 2 hours, so the click
>>   stream event of Alice is still buffered in the state
>>   - The enrichment topic will now contain duplicate records for
>>   Alice because of multiple firings of window
>>1. Alice -> FREE -> 10 AM
>>   2. Alice -> PREMIUM -> 11 AM
>>
>> Question is how do I avoid sending duplicate records ? I am not able to
>> understand it. I can think of Low Level joins but not sure how do we know
>> if it is stale data or not based on timestamp (watermark) as it can happen
>> that a particular enriched record is not updated for 6 hrs.
>>
>> Regards,
>> Vinay Patil
>>
>
>
> --
>
> Konstantin Knauf
>


Re: Flink 1.10 Out of memory

2020-04-24 Thread Zahid Rahman
https://youtu.be/UEkjRN8jRx4  22:10


-  one  option is to reduce flink managed memory from default  70% to may
be 50%.

 - This error could be caused also  due to missing memory ;

- maintaining a local list by programmer so over using user allocated
memory caused by heavy processing ;

 - or using a small jvm ,

- Or JVM  spends too much time on gc.

Out of memory has nothing to do flink or flink is not at fault.


This process is known as "pimping" flink.

also part of pimping is use to use local disk for memory spill.

On Fri, 24 Apr 2020, 03:53 Xintong Song,  wrote:

> @Stephan,
> I don't think so. If JVM hits the direct memory limit, you should see the
> error message "OutOfMemoryError: Direct buffer memory".
>
> Thank you~
>
> Xintong Song
>
>
>
> On Thu, Apr 23, 2020 at 6:11 PM Stephan Ewen  wrote:
>
>> @Xintong and @Lasse could it be that the JVM hits the "Direct Memory"
>> limit here?
>> Would increasing the "taskmanager.memory.framework.off-heap.size" help?
>>
>> On Mon, Apr 20, 2020 at 11:02 AM Zahid Rahman 
>> wrote:
>>
>>> As you can see from the task manager tab of flink web dashboard
>>>
>>> Physical Memory:3.80 GB
>>> JVM Heap Size:1.78 GB
>>> Flink Managed Memory:128 MB
>>>
>>> *Flink is only using 128M MB which can easily cause OOM*
>>> *error.*
>>>
>>> *These are DEFAULT settings.*
>>>
>>> *I dusted off an old laptop so it only 3.8 GB RAM.*
>>>
>>> What does your job metrics say  ?
>>>
>>> On Mon, 20 Apr 2020, 07:26 Xintong Song,  wrote:
>>>
 Hi Lasse,

 From what I understand, your problem is that JVM tries to fork some
 native process (if you look at the exception stack the root exception is
 thrown from a native method) but there's no enough memory for doing that.
 This could happen when either Mesos is using cgroup strict mode for memory
 control, or there's no more memory on the machine. Flink cannot prevent
 native processes from using more memory. It can only reserve certain amount
 of memory for such native usage when requesting worker memory from the
 deployment environment (in your case Mesos) and allocating Java heap /
 direct memory.

 My suggestion is to try increasing the JVM overhead configuration. You
 can leverage the configuration options
 'taskmanager.memory.jvm-overhead.[min|max|fraction]'. See more details in
 the documentation[1].

 Thank you~

 Xintong Song


 [1]
 https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/config.html#taskmanager-memory-jvm-overhead-max

 On Sat, Apr 18, 2020 at 4:02 AM Zahid Rahman 
 wrote:

> https://betsol.com/java-memory-management-for-java-virtual-machine-jvm/
>
> Backbutton.co.uk
> ¯\_(ツ)_/¯
> ♡۶Java♡۶RMI ♡۶
> Make Use Method {MUM}
> makeuse.org
> 
>
>
> On Fri, 17 Apr 2020 at 14:07, Lasse Nedergaard <
> lassenedergaardfl...@gmail.com> wrote:
>
>> Hi.
>>
>> We have migrated to Flink 1.10 and face out of memory exception and
>> hopeful can someone point us in the right direction.
>>
>> We have a job that use broadcast state, and we sometimes get out
>> memory when it creates a savepoint. See stacktrack below.
>> We have assigned 2.2 GB/task manager and
>> configured  taskmanager.memory.process.size : 2200m
>> In Flink 1.9 our container was terminated because OOM, so 1.10 do a
>> better job, but it still not working and the task manager is leaking mem
>> for each OOM and finial kill by Mesos
>>
>>
>> Any idea what we can do to figure out what settings we need to change?
>>
>> Thanks in advance
>>
>> Lasse Nedergaard
>>
>>
>> WARN o.a.flink.runtime.state.filesystem.FsCheckpointStreamFactory -
>> Could not close the state stream for
>> s3://flinkstate/dcos-prod/checkpoints/fc9318cc236d09f0bfd994f138896d6c/chk-3509/cf0714dc-ad7c-4946-b44c-96d4a131a4fa.
>> java.io.IOException: Cannot allocate memory at
>> java.io.FileOutputStream.writeBytes(Native Method) at
>> java.io.FileOutputStream.write(FileOutputStream.java:326) at
>> java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82) at
>> java.io.BufferedOutputStream.flush(BufferedOutputStream.java:140) at
>> java.io.FilterOutputStream.flush(FilterOutputStream.java:140) at
>> java.io.FilterOutputStream.close(FilterOutputStream.java:158) at
>> com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3OutputStream.close(PrestoS3FileSystem.java:995)
>> at
>> org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:72)
>> at
>> org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:101)
>> at
>> org.apache.flink.fs.s3presto.common.HadoopDataOutputStream.close(HadoopDataOutputStream.java:52)
>> at
>> org.apache.flink.core.fs.ClosingFSDataOutputStream.close(ClosingFSDataOutputStream.ja

Re: Checkpoint Error Because "Could not find any valid local directory for s3ablock-0001"

2020-04-24 Thread Robert Metzger
Thanks for opening the ticket. I've asked a committer who knows the
streaming sink well to take a look at the ticket.

On Fri, Apr 24, 2020 at 6:47 AM Lu Niu  wrote:

> Hi, Robert
>
> BTW, I did some field study and I think it's possible to support streaming
> sink using presto s3 filesystem. I think that would help user to use presto
> s3 fs in all access to s3. I created this jira ticket
> https://issues.apache.org/jira/browse/FLINK-17364 . what do you think?
>
> Best
> Lu
>
> On Tue, Apr 21, 2020 at 1:46 PM Lu Niu  wrote:
>
>> Cool, thanks!
>>
>> On Tue, Apr 21, 2020 at 4:51 AM Robert Metzger 
>> wrote:
>>
>>> I'm not aware of anything. I think the presto s3 file system is
>>> generally the recommended S3 FS implementation.
>>>
>>> On Mon, Apr 13, 2020 at 11:46 PM Lu Niu  wrote:
>>>
 Thank you both. Given the debug overhead, I might just try out presto
 s3 file system then. Besides that presto s3 file system doesn't support
 streaming sink, is there anything else I need to keep in mind? Thanks!

 Best
 Lu

 On Thu, Apr 9, 2020 at 12:29 AM Robert Metzger 
 wrote:

> Hey,
> Others have experienced this as well, yes:
> https://lists.apache.org/thread.html/5cfb48b36e2aa2b91b2102398ddf561877c28fdbabfdb59313965f0a%40%3Cuser.flink.apache.org%3EDiskErrorException
> I have also notified the Hadoop project about this issue:
> https://issues.apache.org/jira/browse/HADOOP-15915
>
> I agree with Congxian: You could try reaching out to the Hadoop user@
> list for additional help. Maybe logging on DEBUG level helps already?
> If you are up for an adventure, you could also consider adding some
> debugging code into Hadoop's DiskChecker and compile a custom Hadoop
> version.
>
> Best,
> Robert
>
>
> On Thu, Apr 9, 2020 at 6:39 AM Congxian Qiu 
> wrote:
>
>> Hi LU
>>
>> I'm not familiar with S3 file system, maybe others in Flink community
>> can help you in this case, or maybe you can also reach out to s3
>> teams/community for help.
>>
>> Best,
>> Congxian
>>
>>
>> Lu Niu  于2020年4月8日周三 上午11:05写道:
>>
>>> Hi, Congxiao
>>>
>>> Thanks for replying. yeah, I also found those references. However,
>>> as I mentioned in original post, there is enough capacity in all disk.
>>> Also, when I switch to presto file system, the problem goes away. 
>>> Wondering
>>> whether others encounter similar issue.
>>>
>>> Best
>>> Lu
>>>
>>> On Tue, Apr 7, 2020 at 7:03 PM Congxian Qiu 
>>> wrote:
>>>
 Hi
 From the stack, seems the problem is that "
 org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.
 util.DiskChecker$DiskErrorException: Could not find any valid
 local directory for s3ablock-0001-", and I googled the exception, found
 there is some relative page[1], could you please make sure there is 
 enough
 space on the local dis.

 [1]
 https://community.pivotal.io/s/article/Map-Reduce-job-failed-with-Could-not-find-any-valid-local-directory-for-output-attempt---m-x-file-out
 Best,
 Congxian


 Lu Niu  于2020年4月8日周三 上午8:41写道:

> Hi, flink users
>
> Did anyone encounter such error? The error comes from
> S3AFileSystem. But there is no capacity issue on any disk. we are 
> using
> hadoop 2.7.1.
> ```
>
> Caused by: java.util.concurrent.ExecutionException: 
> java.io.IOException: Could not open output stream for state backend
>   at java.util.concurrent.FutureTask.report(FutureTask.java:122)
>   at java.util.concurrent.FutureTask.get(FutureTask.java:192)
>   at 
> org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:450)
>   at 
> org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.(OperatorSnapshotFinalizer.java:47)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1011)
>   ... 3 more
> Caused by: java.io.IOException: Could not open output stream for 
> state backend
>   at 
> org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.createStream(FsCheckpointStreamFactory.java:367)
>   at 
> org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.flush(FsCheckpointStreamFactory.java:234)
>   at 
> org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.write(FsCheckpointStreamFactory.java:209)
>   at 
> org.apache.flink.contrib.streaming.state.RocksDBStateUploader.uploadLocalFileToCheckpointFs(

Re: Flink 1.10 Out of memory

2020-04-24 Thread Stephan Ewen
@zahid I would kindly ask you to rethink you approach to posting here.
Wanting to help answering questions is appreciated, but what you post is
always completely disconnected from the actual issue.

The questions here usually go beyond the obvious and beyond what a simple
Stack Overflow search yields. That's why they are posted here and the
person asking is not simply searching Stack Overflow themselves.

So please either make the effort to really dig into the problem and try to
understand what is the specific issue, rather than posting unrelated stack
overflow links. If you don't want to do that, please stop chiming in.


On Fri, Apr 24, 2020 at 1:15 PM Zahid Rahman  wrote:

> https://youtu.be/UEkjRN8jRx4  22:10
>
>
> -  one  option is to reduce flink managed memory from default  70% to may
> be 50%.
>
>  - This error could be caused also  due to missing memory ;
>
> - maintaining a local list by programmer so over using user allocated
> memory caused by heavy processing ;
>
>  - or using a small jvm ,
>
> - Or JVM  spends too much time on gc.
>
> Out of memory has nothing to do flink or flink is not at fault.
>
>
> This process is known as "pimping" flink.
>
> also part of pimping is use to use local disk for memory spill.
>
> On Fri, 24 Apr 2020, 03:53 Xintong Song,  wrote:
>
>> @Stephan,
>> I don't think so. If JVM hits the direct memory limit, you should see the
>> error message "OutOfMemoryError: Direct buffer memory".
>>
>> Thank you~
>>
>> Xintong Song
>>
>>
>>
>> On Thu, Apr 23, 2020 at 6:11 PM Stephan Ewen  wrote:
>>
>>> @Xintong and @Lasse could it be that the JVM hits the "Direct Memory"
>>> limit here?
>>> Would increasing the "taskmanager.memory.framework.off-heap.size" help?
>>>
>>> On Mon, Apr 20, 2020 at 11:02 AM Zahid Rahman 
>>> wrote:
>>>
 As you can see from the task manager tab of flink web dashboard

 Physical Memory:3.80 GB
 JVM Heap Size:1.78 GB
 Flink Managed Memory:128 MB

 *Flink is only using 128M MB which can easily cause OOM*
 *error.*

 *These are DEFAULT settings.*

 *I dusted off an old laptop so it only 3.8 GB RAM.*

 What does your job metrics say  ?

 On Mon, 20 Apr 2020, 07:26 Xintong Song,  wrote:

> Hi Lasse,
>
> From what I understand, your problem is that JVM tries to fork some
> native process (if you look at the exception stack the root exception is
> thrown from a native method) but there's no enough memory for doing that.
> This could happen when either Mesos is using cgroup strict mode for memory
> control, or there's no more memory on the machine. Flink cannot prevent
> native processes from using more memory. It can only reserve certain 
> amount
> of memory for such native usage when requesting worker memory from the
> deployment environment (in your case Mesos) and allocating Java heap /
> direct memory.
>
> My suggestion is to try increasing the JVM overhead configuration. You
> can leverage the configuration options
> 'taskmanager.memory.jvm-overhead.[min|max|fraction]'. See more details in
> the documentation[1].
>
> Thank you~
>
> Xintong Song
>
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/config.html#taskmanager-memory-jvm-overhead-max
>
> On Sat, Apr 18, 2020 at 4:02 AM Zahid Rahman 
> wrote:
>
>>
>> https://betsol.com/java-memory-management-for-java-virtual-machine-jvm/
>>
>> Backbutton.co.uk
>> ¯\_(ツ)_/¯
>> ♡۶Java♡۶RMI ♡۶
>> Make Use Method {MUM}
>> makeuse.org
>> 
>>
>>
>> On Fri, 17 Apr 2020 at 14:07, Lasse Nedergaard <
>> lassenedergaardfl...@gmail.com> wrote:
>>
>>> Hi.
>>>
>>> We have migrated to Flink 1.10 and face out of memory exception and
>>> hopeful can someone point us in the right direction.
>>>
>>> We have a job that use broadcast state, and we sometimes get out
>>> memory when it creates a savepoint. See stacktrack below.
>>> We have assigned 2.2 GB/task manager and
>>> configured  taskmanager.memory.process.size : 2200m
>>> In Flink 1.9 our container was terminated because OOM, so 1.10 do a
>>> better job, but it still not working and the task manager is leaking mem
>>> for each OOM and finial kill by Mesos
>>>
>>>
>>> Any idea what we can do to figure out what settings we need to
>>> change?
>>>
>>> Thanks in advance
>>>
>>> Lasse Nedergaard
>>>
>>>
>>> WARN o.a.flink.runtime.state.filesystem.FsCheckpointStreamFactory -
>>> Could not close the state stream for
>>> s3://flinkstate/dcos-prod/checkpoints/fc9318cc236d09f0bfd994f138896d6c/chk-3509/cf0714dc-ad7c-4946-b44c-96d4a131a4fa.
>>> java.io.IOException: Cannot allocate memory at
>>> java.io.FileOutputStream.writeBytes(Native Method) at
>>> java.io.FileOutputStream.write(FileOut

Re: Fault tolerance in Flink file Sink

2020-04-24 Thread Dawid Wysakowicz
Hi Eyal,

First of all I would say a local filesystem is not a right choice for
what you are trying to achieve. I don't think you can achive a true
exactly once policy in this setup. Let me elaborate why.

Let me clarify a bit how the StreamingFileSink works.  The interesting
bit is how it behaves on checkpoints. The behavior is controlled by a
RollingPolicy. As you have not said what format you use lets assume you
use row format first. For a row format the default rolling policy (when
to change the file from in-progress to pending) is it will be rolled if
the file reaches 128MB, the file is older than 60 sec or it has not been
written to for 60 sec. It does not roll on a checkpoint. Moreover
StreamingFileSink considers the filesystem as a durable sink that can be
accessed after a restore. That implies that it will try to append to
this file when restoring from checkpoint/savepoint.

Even if you rolled the files on every checkpoint you still might face
the problem that you can have some leftovers because the
StreamingFileSink moves the files from pending to complete after the
checkpoint is completed. If a failure happens between finishing the
checkpoint and moving the files it will not be able to move them after a
restore (it would do it if had an access).

Lastly a completed checkpoint will contain offsets of records that were
processed successfully end-to-end, that means records that are assumed
committed by the StreamingFileSink. This can be records written to an
in-progress file with a pointer in a StreamingFileSink checkpointed
metadata, records in a "pending" file with an entry in a
StreamingFileSink checkpointed metadata that this file has been
completed or records in "finished" files.[1]

Therefore as you can see there are multiple scenarios when the
StreamingFileSink has to access the files after a restart.

Last last thing, you mentioned "committing to the "bootstrap-server".
Bear in mind that Flink does not use offsets committed back to Kafka for
guaranteeing consistency. It can write those offsets back but only for
monitoring/debugging purposes. Flink stores/restores the processed
offsets from its checkpoints.[3]

Let me know if it helped. I tried my best ;) BTW I highly encourage
reading the linked sources as they try to describe all that in a more
structured way.

I am also cc'ing Kostas who knows more about the StreamingFileSink than
I do., so he can maybe correct me somewhere.

 Best,

Dawid

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/connectors/streamfile_sink.html

[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/connectors/kafka.html

[3]https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/connectors/kafka.html#kafka-consumers-offset-committing-behaviour-configuration

On 23/04/2020 12:11, Eyal Pe'er wrote:
>
> Hi all,
> I am using Flink streaming with Kafka consumer connector
> (FlinkKafkaConsumer) and file Sink (StreamingFileSink) in a cluster
> mode with exactly once policy.
>
> The file sink writes the files to the local disk.
>
> I’ve noticed that if a job fails and automatic restart is on, the task
> managers look for the leftovers files from the last failing job
> (hidden files).
>
> Obviously, since the tasks can be assigned to different task managers,
> this sums up to more failures over and over again.
>
> The only solution I found so far is to delete the hidden files and
> resubmit the job.
>
> If I get it right (and please correct me If I wrong), the events in
> the hidden files were not committed to the bootstrap-server, so there
> is no data loss.
>
>  
>
> Is there a way, forcing Flink to ignore the files that were written
> already? Or maybe there is a better way to implement the solution
> (maybe somehow with savepoints)?
>
>  
>
> Best regards
>
> Eyal Peer
>
>  
>


signature.asc
Description: OpenPGP digital signature


Re: Fault tolerance in Flink file Sink

2020-04-24 Thread Dawid Wysakowicz
Forgot to cc Kostas

On 23/04/2020 12:11, Eyal Pe'er wrote:
>
> Hi all,
> I am using Flink streaming with Kafka consumer connector
> (FlinkKafkaConsumer) and file Sink (StreamingFileSink) in a cluster
> mode with exactly once policy.
>
> The file sink writes the files to the local disk.
>
> I’ve noticed that if a job fails and automatic restart is on, the task
> managers look for the leftovers files from the last failing job
> (hidden files).
>
> Obviously, since the tasks can be assigned to different task managers,
> this sums up to more failures over and over again.
>
> The only solution I found so far is to delete the hidden files and
> resubmit the job.
>
> If I get it right (and please correct me If I wrong), the events in
> the hidden files were not committed to the bootstrap-server, so there
> is no data loss.
>
>  
>
> Is there a way, forcing Flink to ignore the files that were written
> already? Or maybe there is a better way to implement the solution
> (maybe somehow with savepoints)?
>
>  
>
> Best regards
>
> Eyal Peer
>
>  
>


signature.asc
Description: OpenPGP digital signature


Re: Flink Forward 2020 Recorded Sessions

2020-04-24 Thread Sivaprasanna
Cool. Thanks for the information.

On Fri, 24 Apr 2020 at 11:20 AM, Marta Paes Moreira 
wrote:

> Hi, Sivaprasanna.
>
> The talks will be up on Youtube sometime after the conference ends.
>
> Today, the starting schedule is different (9AM CEST / 12:30PM IST / 3PM
> CST) and more friendly to Europe, India and China. Hope you manage to join
> some sessions!
>
> Marta
>
> On Fri, 24 Apr 2020 at 06:58, Sivaprasanna 
> wrote:
>
>> Hello,
>>
>> I had registered for the Flink Forward 2020 and had attended couple of
>> sessions but due to the odd timings and overlapping sessions on the same
>> slot, I wasn't able to attend some interesting talks. I have received mails
>> with link to rewatch some 2-3 webinars but not all (that had happened yet).
>> Where can I find the recorded sessions?
>>
>> Thanks,
>> Sivaprasanna
>>
>


JDBC error on numeric conversion (because of DecimalType MIN_PRECISION)

2020-04-24 Thread Flavio Pompermaier
Hi to all,
I was doing a test against Postgres and I get an error because the jdbc
connect tries to create a DecimalType with precision 0 (min = 1).
Should this DecimalType.MIN_PRECISION lowered to 0 or should NUMERIC type
of jdbc tables mapped in some other way?

The Table was created with the following statement (the test fails on
creating ord_qty filed):

CREATE TABLE orders(
ord_no integer PRIMARY KEY,
ord_date date,
item_name character(35),
item_grade character(1),
ord_qty numeric,
ord_amount numeric,
CONSTRAINT unq_ordno_itname UNIQUE(ord_qty,ord_amount)
);

Best,
Flavio
-- 
Flavio Pompermaier
Development Department

OKKAM S.r.l.
Tel. +(39) 0461 041809


[ANNOUNCE] Development progress of Apache Flink 1.11

2020-04-24 Thread Piotr Nowojski
Hi community,

It has been more than 6 weeks since the previous announcement and as we are
approaching the expected feature freeze we would like to share the Flink
1.11 status update with you.

Initially we were aiming for the feature freeze to happen in late April
(now), however it was recently proposed to be postponed by a couple of
weeks to mid May. [0]

A lot of people in the community are working hard to complete promised
features and there is good progress. We even have managed to already
complete a couple of features. We have updated the features list from the
previous announcement and we have highlighted features that are already
done and also the features that are no longer aimed for Flink 1.11 release
and will be most likely postponed to a later date.

Your release managers,
Zhijiang & Piotr Nowojski

Features already done and ready for Flink 1.11

   -

   PyFlink
   -

  FLIP-96: Add Python ML API [54]
  -

  FLINK-14500: Fully support all kinds of Python UDF [55]
  -

   Runtime
   -

  FLIP-67: Support for cluster partitions [20]
  -

  FLIP-92: Add N-Ary input stream operator in Flink [24]
  -

  [FLINK-10742] Let Netty use Flink's buffers on downstream side [28]
  -

  [FLINK-15911][FLINK-15154] Support Flink work over NAT [39]
  -

  [FLINK-15672] Switch to Log4j2 by default [34]


Features not targeted for Flink 1.11 anymore:

   -

   SQL / Table
   -

  FLIP-91 Introduce SQL client gateway and provide JDBC driver [4]
  -

  FLIP-107: Reading table columns from different parts of source
  records  [7]


   -

   ML / Connectors


   -

  FLIP-72: Pulsar source / sink / catalog [49]
  -

  Update ML Pipeline API interface to better support Flink ML lib
  algorithms
  -

   PyFlink
   -

  Support running python UDF in docker workers
  -

   Runtime
   -

  [FLINK-15786] Use the separated classloader to load connectors’ jar
  [37]


   -

  Calculate required shuffle memory before allocating slots
  -

   State Backend:
   -

  Support getCustomizedState in KeyedStateStore [47]


Features still in progress for Flink 1.11:

   -

   SQL / Table
   -

  FLIP-65: New type inference for Table API UDFs [2]
  -

  FLIP-84: Improve TableEnv’s interface [3]
  -

  FLIP-93: Introduce JDBC catalog and Postgres catalog [5]
  -

  FLIP-105: Support to interpret and emit changelog in Flink SQL [6]
  -

  [FLINK-14807] Add Table#collect API for fetching data [8]
  -

  Support query and table hints


   -

   ML / Connectors
   -

  FLIP-27: New source API [9]
  -

  [FLINK-15670] Wrap a source/sink pair to persist intermediate result
  for subgraph failure recovery [10]


   -

   PyFlink
   -

  FLIP-106, FLIP-114: Expand the usage scope of Python UDF [12][50]
  -

  FLIP-112: Debugging and monitoring of Python UDF [11]
  -

  FLIP-97, FLIP-120
  
:
  Integration with most popular Python libraries (Pandas) [51][52]
  -

  FLIP-121 Performance improvements of Python UDF [53]
  -

   Web UI
   -

  FLIP-98: Better back pressure detection [13]
  -

  FLIP-99: Make max exception configurable [14]
  -

  FLIP-100: Add attempt information [15]
  -

  FLIP-102: Add more metrics to TaskManager [16]
  -

  FLIP-103: Better TM/JM log display [17]
  -

  [FLINK-14816] Add thread dump feature for TaskManager [18]
  -

   Runtime
   -

  FLIP-56: Support for dynamic slots on the TaskExecutor [19]
  -

  FLIP-76: Unaligned checkpoints [21]
  -

  FLIP-83: Flink e2e performance testing framework [22]
  -

  FLIP-85: Support cluster deploy mode [23]
  -

  FLIP-108: Add GPU to the resource management (specifically for UDTF &
  UDF) [25]
  -

  FLIP-111: Consolidate docker images [26]
  -

  FLIP-116: Unified memory configuration for JobManager [56]
  -

  [FLINK-9407] ORC format for StreamingFileSink [27]
  -

  [FLINK-10934] Support per-job mode for Kubernetes integration [29]
  -

  [FLINK-11395] Avro writer for StreamingFileSink [30]
  -

  [FLINK-11427] Protobuf parquet writer for StreamingFileSink [31]
  -

  [FLINK-11499] Extend StreamingFileSink BulkFormats to support
  arbitrary roll policies [32]
  -

  [FLINK-14106] Make SlotManager pluggable [33]
  -

  [FLINK-15674] Consolidate Java and Scala type extraction stack [35]
  -

  [FLINK-15679] Improve Flink’s ID system [36]
  -

  [FLINK-15788] Various Kubernetes improvements [38]
  -

  [FLINK-16408] Bind user code class loader to lifetime of a slot [40]
  -

  [FLINK-16428] Network memory management for backpressure [41]
  -

  [FLINK-1643

Re: Question about Scala Case Class and List in Flink

2020-04-24 Thread Averell
Hi Timo,

This is my case class:
/case class Box[T](meta: Metadata, value: T) {
  def function1: A=>B = {...}
  def method2(...):A = {...}
}/

However, I still get that warning "/Class class data.package$Box cannot be
used as a POJO type because not all fields are valid POJO fields, and must
be processed as GenericType. Please read the Flink documentation on "Data
Types & Serialization" for details of the effect on performance./"

I imported /org.apache.flink.streaming.api.scala._/ << is this enough to
tell that I am using Scala API?

Thanks and regards,
Averell



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: K8s native - checkpointing to S3 with RockDBStateBackend

2020-04-24 Thread Averell
Thank you Yun Tang.
Building my own docker image as suggested solved my problem.

However, I don't understand why I need that while I already had that
s3-hadoop jar included in my uber jar?

Thanks.
Regards,
Averell



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: [ANNOUNCE] Development progress of Apache Flink 1.11

2020-04-24 Thread Till Rohrmann
Thanks for the update Piotr.

Cheers,
Till

On Fri, Apr 24, 2020 at 4:42 PM Piotr Nowojski  wrote:

> Hi community,
>
> It has been more than 6 weeks since the previous announcement and as we are
> approaching the expected feature freeze we would like to share the Flink
> 1.11 status update with you.
>
> Initially we were aiming for the feature freeze to happen in late April
> (now), however it was recently proposed to be postponed by a couple of
> weeks to mid May. [0]
>
> A lot of people in the community are working hard to complete promised
> features and there is good progress. We even have managed to already
> complete a couple of features. We have updated the features list from the
> previous announcement and we have highlighted features that are already
> done and also the features that are no longer aimed for Flink 1.11 release
> and will be most likely postponed to a later date.
>
> Your release managers,
> Zhijiang & Piotr Nowojski
>
> Features already done and ready for Flink 1.11
>
>-
>
>PyFlink
>-
>
>   FLIP-96: Add Python ML API [54]
>   -
>
>   FLINK-14500: Fully support all kinds of Python UDF [55]
>   -
>
>Runtime
>-
>
>   FLIP-67: Support for cluster partitions [20]
>   -
>
>   FLIP-92: Add N-Ary input stream operator in Flink [24]
>   -
>
>   [FLINK-10742] Let Netty use Flink's buffers on downstream side [28]
>   -
>
>   [FLINK-15911][FLINK-15154] Support Flink work over NAT [39]
>   -
>
>   [FLINK-15672] Switch to Log4j2 by default [34]
>
>
> Features not targeted for Flink 1.11 anymore:
>
>-
>
>SQL / Table
>-
>
>   FLIP-91 Introduce SQL client gateway and provide JDBC driver [4]
>   -
>
>   FLIP-107: Reading table columns from different parts of source
>   records  [7]
>
>
>-
>
>ML / Connectors
>
>
>-
>
>   FLIP-72: Pulsar source / sink / catalog [49]
>   -
>
>   Update ML Pipeline API interface to better support Flink ML lib
>   algorithms
>   -
>
>PyFlink
>-
>
>   Support running python UDF in docker workers
>   -
>
>Runtime
>-
>
>   [FLINK-15786] Use the separated classloader to load connectors’ jar
>   [37]
>
>
>-
>
>   Calculate required shuffle memory before allocating slots
>   -
>
>State Backend:
>-
>
>   Support getCustomizedState in KeyedStateStore [47]
>
>
> Features still in progress for Flink 1.11:
>
>-
>
>SQL / Table
>-
>
>   FLIP-65: New type inference for Table API UDFs [2]
>   -
>
>   FLIP-84: Improve TableEnv’s interface [3]
>   -
>
>   FLIP-93: Introduce JDBC catalog and Postgres catalog [5]
>   -
>
>   FLIP-105: Support to interpret and emit changelog in Flink SQL [6]
>   -
>
>   [FLINK-14807] Add Table#collect API for fetching data [8]
>   -
>
>   Support query and table hints
>
>
>-
>
>ML / Connectors
>-
>
>   FLIP-27: New source API [9]
>   -
>
>   [FLINK-15670] Wrap a source/sink pair to persist intermediate result
>   for subgraph failure recovery [10]
>
>
>-
>
>PyFlink
>-
>
>   FLIP-106, FLIP-114: Expand the usage scope of Python UDF [12][50]
>   -
>
>   FLIP-112: Debugging and monitoring of Python UDF [11]
>   -
>
>   FLIP-97, FLIP-120
>   <
> https://docs.google.com/document/d/1rUZHxS7rguLi4oJNEAu6xcRJcW7ldIxAdewoIxoO5w8/edit#heading=h.ghlv7e457i4
> >:
>   Integration with most popular Python libraries (Pandas) [51][52]
>   -
>
>   FLIP-121 Performance improvements of Python UDF [53]
>   -
>
>Web UI
>-
>
>   FLIP-98: Better back pressure detection [13]
>   -
>
>   FLIP-99: Make max exception configurable [14]
>   -
>
>   FLIP-100: Add attempt information [15]
>   -
>
>   FLIP-102: Add more metrics to TaskManager [16]
>   -
>
>   FLIP-103: Better TM/JM log display [17]
>   -
>
>   [FLINK-14816] Add thread dump feature for TaskManager [18]
>   -
>
>Runtime
>-
>
>   FLIP-56: Support for dynamic slots on the TaskExecutor [19]
>   -
>
>   FLIP-76: Unaligned checkpoints [21]
>   -
>
>   FLIP-83: Flink e2e performance testing framework [22]
>   -
>
>   FLIP-85: Support cluster deploy mode [23]
>   -
>
>   FLIP-108: Add GPU to the resource management (specifically for UDTF &
>   UDF) [25]
>   -
>
>   FLIP-111: Consolidate docker images [26]
>   -
>
>   FLIP-116: Unified memory configuration for JobManager [56]
>   -
>
>   [FLINK-9407] ORC format for StreamingFileSink [27]
>   -
>
>   [FLINK-10934] Support per-job mode for Kubernetes integration [29]
>   -
>
>   [FLINK-11395] Avro writer for StreamingFileSink [30]
>   -
>
>   [FLINK-11427] Protobuf parquet writer for StreamingFileSink [31]
>   -
>
>   [FLINK-11499] Extend StreamingFileSink BulkFormats to support
>   arbitrary roll policie

checkpointing opening too many file

2020-04-24 Thread ysnakie






Hi everyone
We have a Flink Job to write files to HDFS's different directories. It will open many files due to its high parallelism. I also found that if using rocksdb state backend, it will have even more files open during the checkpointing.  We use yarn to schedule Flink job. However yarn always schedule taskmanagers to the same machine and I cannot control it! So the datanode will get very very high pressure and always throw a "bad link" error.  We hava already increase the xiceviers limit of HDFS to 16384Any idea to solve this problem? reduce the number of opening file or control the yarn scheduling to put taskmanager on different machines!




Thank you very much!regardsShengnan

 

 





Re: K8s native - checkpointing to S3 with RockDBStateBackend

2020-04-24 Thread David Magalhães
I think the classloaders for the uberjar and the link are different. Not
sure if this is the right explanation, but that is why you need to
add flink-s3-fs-hadoop inside the plugin folder in the cluster.

On Fri, Apr 24, 2020 at 4:07 PM Averell  wrote:

> Thank you Yun Tang.
> Building my own docker image as suggested solved my problem.
>
> However, I don't understand why I need that while I already had that
> s3-hadoop jar included in my uber jar?
>
> Thanks.
> Regards,
> Averell
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


RE: History Server Not Showing Any Jobs - File Not Found?

2020-04-24 Thread Hailu, Andreas
I'm having a further look at the code in HistoryServerStaticFileServerHandler - 
is there an assumption about where overview.json is supposed to be located?

// ah

From: Hailu, Andreas [Engineering]
Sent: Wednesday, April 22, 2020 1:32 PM
To: 'Chesnay Schepler' ; Hailu, Andreas [Engineering] 
; user@flink.apache.org
Subject: RE: History Server Not Showing Any Jobs - File Not Found?

Hi Chesnay, thanks for responding. We're using Flink 1.9.1. I enabled DEBUG 
level logging and this is something relevant I see:

2020-04-22 13:25:52,566 [Flink-HistoryServer-ArchiveFetcher-thread-1] DEBUG 
DFSInputStream - Connecting to datanode 10.79.252.101:1019
2020-04-22 13:25:52,567 [Flink-HistoryServer-ArchiveFetcher-thread-1] DEBUG 
SaslDataTransferClient - SASL encryption trust check: localHostTrusted = false, 
remoteHostTrusted = false
2020-04-22 13:25:52,567 [Flink-HistoryServer-ArchiveFetcher-thread-1] DEBUG 
SaslDataTransferClient - SASL client skipping handshake in secured 
configuration with privileged port for addr = /10.79.252.101, datanodeId = 
DatanodeI
nfoWithStorage[10.79.252.101:1019,DS-7f4ec55d-7c5f-4a0e-b817-d9e635480b21,DISK]
2020-04-22 13:25:52,571 [Flink-HistoryServer-ArchiveFetcher-thread-1] DEBUG 
DFSInputStream - DFSInputStream has been closed already
2020-04-22 13:25:52,573 [nioEventLoopGroup-3-6] DEBUG 
HistoryServerStaticFileServerHandler - Unable to load requested file 
/jobs/overview.json from classloader
2020-04-22 13:25:52,576 [IPC Parameter Sending Thread #0] DEBUG 
Client$Connection$3 - IPC Client (1578587450) connection to 
d279536-002.dc.gs.com/10.59.61.87:8020 from d...@gs.com 
sending #1391

Aside from that, it looks like a lot of logging around datanodes and block 
location metadata. Did I miss something in my classpath, perhaps? If so, do you 
have a suggestion on what I could try?

// ah

From: Chesnay Schepler mailto:ches...@apache.org>>
Sent: Wednesday, April 22, 2020 2:16 AM
To: Hailu, Andreas [Engineering] 
mailto:andreas.ha...@ny.email.gs.com>>; 
user@flink.apache.org
Subject: Re: History Server Not Showing Any Jobs - File Not Found?

Which Flink version are you using?
Have you checked the history server logs after enabling debug logging?

On 21/04/2020 17:16, Hailu, Andreas [Engineering] wrote:
Hi,

I'm trying to set up the History Server, but none of my applications are 
showing up in the Web UI. Looking at the console, I see that all of the calls 
to /overview return the following 404 response: {"errors":["File not found."]}.

I've set up my configuration as follows:

JobManager Archive directory:
jobmanager.archive.fs.dir: hdfs:///user/p2epda/lake/delp_qa/flink_hs/
-bash-4.1$ hdfs dfs -ls /user/p2epda/lake/delp_qa/flink_hs
Found 44282 items
-rw-r-   3 delp datalake_admin_dev  50569 2020-03-21 23:17 
/user/p2epda/lake/delp_qa/flink_hs/000144dba9dc0f235768a46b2f26e936
-rw-r-   3 delp datalake_admin_dev  49578 2020-03-03 08:45 
/user/p2epda/lake/delp_qa/flink_hs/000347625d8128ee3fd0b672018e38a5
-rw-r-   3 delp datalake_admin_dev  50842 2020-03-24 15:19 
/user/p2epda/lake/delp_qa/flink_hs/0004be6ce01ba9677d1eb619ad0fa757
...
...

History Server will fetch the archived jobs from the same location:
historyserver.archive.fs.dir: hdfs:///user/p2epda/lake/delp_qa/flink_hs/

So I'm able to confirm that there are indeed archived applications that I 
should be able to view in the histserver. I'm not able to find out what file 
the overview service is looking for from the repository - any suggestions as to 
what I could look into next?

Best,
Andreas



Your Personal Data: We may collect and process information about you that may 
be subject to data protection laws. For more information about how we use and 
disclose your personal data, how we protect your information, our legal basis 
to use your information, your rights and who you can contact, please refer to: 
www.gs.com/privacy-notices





Your Personal Data: We may collect and process information about you that may 
be subject to data protection laws. For more information about how we use and 
disclose your personal data, how we protect your information, our legal basis 
to use your information, your rights and who you can contact, please refer to: 
www.gs.com/privacy-notices


Re: Flink 1.10 Out of memory

2020-04-24 Thread Zahid Rahman
> " a simple Stack Overflow search yields.
"
Actually it wasn't stack overflow but  a video I saw presented by Robert
Metzger. of Apache Flink org.

Your mind  must have been fogged up
with another thought of another email not the contents of my email clearly.

He explained  the very many solutions to the out of memory error.

Obviously I cant dig any deeper unless I have the code in front of me
loaded into an IDE.

for example I came across flink archetypes from Alibaba etc. in Eclipse.

I got  every conceivable possible error I have never seen before.

I used google and StackOverFlow  to solve each error.

It took me about 6 hours but I finally have those archetypes working now.

Also I noticed flakey behaviour from IntelliJ when using flink examples
provided from git hub.

so I loaded same flink examples into Eclipse and and NetBeans  saw same
flakey behaviour was not present.

I concluded that flakey behaviour was due to intelliJ so I am continuing to
spend time on Flink and haven't deleted it yet.

I can replicate the IntelliJ  flakey behaviour for the right price.


That is software development as I understand it.

Obviously you have different views that you can debug using emailing list.

Unlike you that  skill of software debugging by email I do not have so I
will not  "chime" any more. Nor can I read the mind of another on what is
his skill level or product framework familiarity.

You can have all the glory of chiming.

But do keep in mind it was a YouTube video and not  StackOverFlow which is
mostly a text based website where other people  who are self reliant use it
to address buggy software.

I am quite happy to use the crying pains of others before me on stack
overflow to resolve the same software bugs.

It is my view that stack overflow is a partner program with Apache
frameworks . How did we develop software before google or StackOverFlow  or
mailing lists ?

I would have to say it was with comprehensive product documents and
makeuse.org of  software development tools. Mainly an understanding that
software development is tight binding of teaceable logic flow.

Absolutely no magic except in the case of intermittent error may be.

That was aong winded personal attack so this is a long winded  explanation.


I too am a member of a soon to be extinct tribe , can I be apache too  ?

Happy  Hunting of Apaches  :).
















On Fri, 24 Apr 2020, 13:54 Stephan Ewen,  wrote:

> @zahid I would kindly ask you to rethink you approach to posting here.
> Wanting to help answering questions is appreciated, but what you post is
> always completely disconnected from the actual issue.
>
> The questions here usually go beyond the obvious and beyond what a simple
> Stack Overflow search yields. That's why they are posted here and the
> person asking is not simply searching Stack Overflow themselves.
>
> So please either make the effort to really dig into the problem and try to
> understand what is the specific issue, rather than posting unrelated stack
> overflow links. If you don't want to do that, please stop chiming in.
>
>
> On Fri, Apr 24, 2020 at 1:15 PM Zahid Rahman  wrote:
>
>> https://youtu.be/UEkjRN8jRx4  22:10
>>
>>
>> -  one  option is to reduce flink managed memory from default  70% to may
>> be 50%.
>>
>>  - This error could be caused also  due to missing memory ;
>>
>> - maintaining a local list by programmer so over using user allocated
>> memory caused by heavy processing ;
>>
>>  - or using a small jvm ,
>>
>> - Or JVM  spends too much time on gc.
>>
>> Out of memory has nothing to do flink or flink is not at fault.
>>
>>
>> This process is known as "pimping" flink.
>>
>> also part of pimping is use to use local disk for memory spill.
>>
>> On Fri, 24 Apr 2020, 03:53 Xintong Song,  wrote:
>>
>>> @Stephan,
>>> I don't think so. If JVM hits the direct memory limit, you should see
>>> the error message "OutOfMemoryError: Direct buffer memory".
>>>
>>> Thank you~
>>>
>>> Xintong Song
>>>
>>>
>>>
>>> On Thu, Apr 23, 2020 at 6:11 PM Stephan Ewen  wrote:
>>>
 @Xintong and @Lasse could it be that the JVM hits the "Direct Memory"
 limit here?
 Would increasing the "taskmanager.memory.framework.off-heap.size" help?

 On Mon, Apr 20, 2020 at 11:02 AM Zahid Rahman 
 wrote:

> As you can see from the task manager tab of flink web dashboard
>
> Physical Memory:3.80 GB
> JVM Heap Size:1.78 GB
> Flink Managed Memory:128 MB
>
> *Flink is only using 128M MB which can easily cause OOM*
> *error.*
>
> *These are DEFAULT settings.*
>
> *I dusted off an old laptop so it only 3.8 GB RAM.*
>
> What does your job metrics say  ?
>
> On Mon, 20 Apr 2020, 07:26 Xintong Song, 
> wrote:
>
>> Hi Lasse,
>>
>> From what I understand, your problem is that JVM tries to fork some
>> native process (if you look at the exception stack the root exception is
>> thrown from a native method) but there's no enou

Re: JDBC error on numeric conversion (because of DecimalType MIN_PRECISION)

2020-04-24 Thread Flavio Pompermaier
I think I hit the same problem of SPARK-26538 (
https://github.com/apache/spark/pull/23456).
I've handled the case in the same manner in my PR for FLINK-17356 (
https://github.com/apache/flink/pull/11906)

On Fri, Apr 24, 2020 at 4:28 PM Flavio Pompermaier 
wrote:

> Hi to all,
> I was doing a test against Postgres and I get an error because the jdbc
> connect tries to create a DecimalType with precision 0 (min = 1).
> Should this DecimalType.MIN_PRECISION lowered to 0 or should NUMERIC type
> of jdbc tables mapped in some other way?
>
> The Table was created with the following statement (the test fails on
> creating ord_qty filed):
>
> CREATE TABLE orders(
> ord_no integer PRIMARY KEY,
> ord_date date,
> item_name character(35),
> item_grade character(1),
> ord_qty numeric,
> ord_amount numeric,
> CONSTRAINT unq_ordno_itname UNIQUE(ord_qty,ord_amount)
> );
>
> Best,
> Flavio
>


Re: Flink 1.10 Out of memory

2020-04-24 Thread Som Lima
@Zahir

what the Apache means is don't  be like Jesse Anderson who recommended
Flink on the basis Apache only uses  maps as seen in video.

While Flink uses ValueState and State in Streaming API.

Although it appears Jesse Anderson only looked as deep as the data stream
helloworld.
You are required to think and look deeper.

https://youtu.be/sYlbD_OoHhs

Watch "Airbus makes more of the sky with Flink - Jesse Anderson & Hassene
Ben Salem" on YouTube


https://ci.apache.org/projects/flink/flink-docs-release-1.10/getting-started/walkthroughs/datastream_api.html



On Fri, 24 Apr 2020, 17:38 Zahid Rahman,  wrote:

> > " a simple Stack Overflow search yields.
> "
> Actually it wasn't stack overflow but  a video I saw presented by Robert
> Metzger. of Apache Flink org.
>
> Your mind  must have been fogged up
> with another thought of another email not the contents of my email clearly.
>
> He explained  the very many solutions to the out of memory error.
>
> Obviously I cant dig any deeper unless I have the code in front of me
> loaded into an IDE.
>
> for example I came across flink archetypes from Alibaba etc. in Eclipse.
>
> I got  every conceivable possible error I have never seen before.
>
> I used google and StackOverFlow  to solve each error.
>
> It took me about 6 hours but I finally have those archetypes working now.
>
> Also I noticed flakey behaviour from IntelliJ when using flink examples
> provided from git hub.
>
> so I loaded same flink examples into Eclipse and and NetBeans  saw same
> flakey behaviour was not present.
>
> I concluded that flakey behaviour was due to intelliJ so I am continuing
> to spend time on Flink and haven't deleted it yet.
>
> I can replicate the IntelliJ  flakey behaviour for the right price.
>
>
> That is software development as I understand it.
>
> Obviously you have different views that you can debug using emailing list.
>
> Unlike you that  skill of software debugging by email I do not have so I
> will not  "chime" any more. Nor can I read the mind of another on what is
> his skill level or product framework familiarity.
>
> You can have all the glory of chiming.
>
> But do keep in mind it was a YouTube video and not  StackOverFlow which is
> mostly a text based website where other people  who are self reliant use it
> to address buggy software.
>
> I am quite happy to use the crying pains of others before me on stack
> overflow to resolve the same software bugs.
>
> It is my view that stack overflow is a partner program with Apache
> frameworks . How did we develop software before google or StackOverFlow  or
> mailing lists ?
>
> I would have to say it was with comprehensive product documents and
> makeuse.org of  software development tools. Mainly an understanding that
> software development is tight binding of teaceable logic flow.
>
> Absolutely no magic except in the case of intermittent error may be.
>
> That was aong winded personal attack so this is a long winded
> explanation.
>
>
> I too am a member of a soon to be extinct tribe , can I be apache too  ?
>
> Happy  Hunting of Apaches  :).
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> On Fri, 24 Apr 2020, 13:54 Stephan Ewen,  wrote:
>
>> @zahid I would kindly ask you to rethink you approach to posting here.
>> Wanting to help answering questions is appreciated, but what you post is
>> always completely disconnected from the actual issue.
>>
>> The questions here usually go beyond the obvious and beyond what a simple
>> Stack Overflow search yields. That's why they are posted here and the
>> person asking is not simply searching Stack Overflow themselves.
>>
>> So please either make the effort to really dig into the problem and try
>> to understand what is the specific issue, rather than posting unrelated
>> stack overflow links. If you don't want to do that, please stop chiming in.
>>
>>
>> On Fri, Apr 24, 2020 at 1:15 PM Zahid Rahman 
>> wrote:
>>
>>> https://youtu.be/UEkjRN8jRx4  22:10
>>>
>>>
>>> -  one  option is to reduce flink managed memory from default  70% to
>>> may be 50%.
>>>
>>>  - This error could be caused also  due to missing memory ;
>>>
>>> - maintaining a local list by programmer so over using user allocated
>>> memory caused by heavy processing ;
>>>
>>>  - or using a small jvm ,
>>>
>>> - Or JVM  spends too much time on gc.
>>>
>>> Out of memory has nothing to do flink or flink is not at fault.
>>>
>>>
>>> This process is known as "pimping" flink.
>>>
>>> also part of pimping is use to use local disk for memory spill.
>>>
>>> On Fri, 24 Apr 2020, 03:53 Xintong Song,  wrote:
>>>
 @Stephan,
 I don't think so. If JVM hits the direct memory limit, you should see
 the error message "OutOfMemoryError: Direct buffer memory".

 Thank you~

 Xintong Song



 On Thu, Apr 23, 2020 at 6:11 PM Stephan Ewen  wrote:

> @Xintong and @Lasse could it be that the JVM hits the "Direct Memory"
> limit here?
> Would increasing the "taskmanager.memory.framework.off-

ArrayIndexoutofBoundsException.

2020-04-24 Thread Zahid Rahman
@Stephan Ewen.

That was the other response I gave.

I have thought about it really hard as per your request.


Dr. NOUREDDIN SADAWI
Shows how to handle that exception.

https://youtu.be/c7rsWQvpw4k

@ 7:06.


Re: checkpointing opening too many file

2020-04-24 Thread Congxian Qiu
Hi
If there are indeed so many files need to upload to hdfs, then currently we
do not have any solutions to limit the open files, there exist an issue[1]
wants to fix this problem, and a pr for it, maybe you can try the attached
pr to try it can solve your problem.

[1] https://issues.apache.org/jira/browse/FLINK-11937
Best,
Congxian


ysnakie  于2020年4月24日周五 下午11:30写道:

> Hi everyone
> We have a Flink Job to write files to HDFS's different directories. It
> will open many files due to its high parallelism. I also found that if
> using rocksdb state backend, it will have even more files open during the
> checkpointing.  We use yarn to schedule Flink job. However yarn always
> schedule taskmanagers to the same machine and I cannot control it! So the
> datanode will get very very high pressure and always throw a "bad link"
> error.  We hava already increase the xiceviers limit of HDFS to 16384
>
> Any idea to solve this problem? reduce the number of opening file or
> control the yarn scheduling to put taskmanager on different machines!
>
> Thank you very much!
> regards
>
> Shengnan
>
>


Re: Debug Slowness in Async Checkpointing

2020-04-24 Thread Congxian Qiu
Hi
If the bottleneck is the upload part, did you even have tried upload files
using multithread[1]

[1] https://issues.apache.org/jira/browse/FLINK-11008
Best,
Congxian


Lu Niu  于2020年4月24日周五 下午12:38写道:

> Hi, Robert
>
> Thanks for relying. Yeah. After I added monitoring on the above path, it
> shows the slowness did come from uploading file to s3. Right now I am still
> investigating the issue. At the same time, I am trying PrestoS3FileSystem
> to check whether that can mitigate the problem.
>
> Best
> Lu
>
> On Thu, Apr 23, 2020 at 8:10 AM Robert Metzger 
> wrote:
>
>> Hi Lu,
>>
>> were you able to resolve the issue with the slow async checkpoints?
>>
>> I've added Yu Li to this thread. He has more experience with the state
>> backends to decide which monitoring is appropriate for such situations.
>>
>> Best,
>> Robert
>>
>>
>> On Tue, Apr 21, 2020 at 10:50 PM Lu Niu  wrote:
>>
>>> Hi, Robert
>>>
>>> Thanks for replying. To improve observability , do you think we should
>>> expose more metrics in checkpointing? for example, in incremental
>>> checkpoint, the time spend on uploading sst files?
>>> https://github.com/apache/flink/blob/5b71c7f2fe36c760924848295a8090898cb10f15/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksIncrementalSnapshotStrategy.java#L319
>>>
>>> Best
>>> Lu
>>>
>>>
>>> On Fri, Apr 17, 2020 at 11:31 AM Robert Metzger 
>>> wrote:
>>>
 Hi,
 did you check the TaskManager logs if there are retries by the s3a file
 system during checkpointing?

 I'm not aware of any metrics in Flink that could be helpful in this
 situation.

 Best,
 Robert

 On Tue, Apr 14, 2020 at 12:02 AM Lu Niu  wrote:

> Hi, Flink users
>
> We notice sometimes async checkpointing can be extremely slow, leading
> to checkpoint timeout. For example, For a state size around 2.5MB, it 
> could
> take 7~12min in async checkpointing:
>
> [image: Screen Shot 2020-04-09 at 5.04.30 PM.png]
>
> Notice all the slowness comes from async checkpointing, no delay in
> sync part and barrier assignment. As we use rocksdb incremental
> checkpointing, I notice the slowness might be caused by uploading the file
> to s3. However, I am not completely sure since there are other steps in
> async checkpointing. Does flink expose fine-granular metrics to debug such
> slowness?
>
> setup: flink 1.9.1, rocksdb incremental state backend,
> S3AHaoopFileSystem
>
> Best
> Lu
>