Re: PyFlink - Scala UDF - How to convert Scala Map in Table API?

2020-11-16 Thread Pierre Oberholzer
Hi Dian, Community,

(bringing the thread back to wider audience)

As you suggested, I've tried to use DataTypeHint with Row instead of Map but
also this simple case leads to a type mismatch between UDF and Table API.
I've also tried other Map objects from Flink (table.data.MapData,
flink.types.MapValue, flink.table.api.DataTypes.MAP) in addition to Java
(java.util.Map) in combination with DataTypeHint, without success.
N.B. I'm using version 1.11.

Am I doing something wrong or am I facing limitations in the toolkit ?

Thanks in advance for your support !

Best regards,

*Scala UDF*

class dummyMap() extends ScalarFunction {

 @DataTypeHint("ROW")
 def eval(): Row = {

Row.of(java.lang.String.valueOf("foo"), java.lang.String.valueOf("bar"))

  }
}

*Table DDL*

my_sink_ddl = f"""
create table mySink (
output_of_my_scala_udf ROW
) with (
...
)
"""

*Error*

Py4JJavaError: An error occurred while calling o2.execute.
: org.apache.flink.table.api.ValidationException: Field types of query
result and registered TableSink
`default_catalog`.`default_database`.`mySink` do not match.
Query result schema: [output_of_my_scala_udf:
GenericType]
TableSink schema:[output_of_my_scala_udf: Row(s: String, t: String)]



Le ven. 13 nov. 2020 à 11:59, Pierre Oberholzer 
a écrit :

> Thanks Dian, but same error when using explicit returned type:
>
> class dummyMap() extends ScalarFunction {
>
>   def eval() : util.Map[java.lang.String,java.lang.String] = {
>
> val states = Map("key1" -> "val1", "key2" -> "val2")
> states.asInstanceOf[util.Map[java.lang.String,java.lang.String]]
>
>   }
> }
>
> Le ven. 13 nov. 2020 à 10:34, Dian Fu  a écrit :
>
>> You need to explicitly defined the result type the UDF. You could refer
>> to [1] for more details if you are using Flink 1.11. If you are using other
>> versions of Flink, you need to refer to the corresponding documentation.
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/functions/udfs.html#implementation-guide
>>
>> 在 2020年11月13日,下午4:56,Pierre Oberholzer  写道:
>>
>> ScalarFunction
>>
>>
>>
>
> --
> Pierre
>

-- 
Pierre


IllegalStateException Printing Plan

2020-11-16 Thread Rex Fenley
Hello,

I have the following code attempting to print the execution plan for my job
locally. The job runs fine and Flink UI displays so I'd expect this to work.

val tableResult = userDocsTable.executeInsert(SINK_ES_PEOPLE)
println(s"execution plan:\n${this.env.getExecutionPlan()}")

but instead I end up with

Caused by: java.lang.IllegalStateException: No operators defined in
streaming topology. Cannot execute.

What am I doing wrong?

-- 

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com  |  BLOG 
 |  FOLLOW
US   |  LIKE US



Force Join Unique Key

2020-11-16 Thread Rex Fenley
Hello,

I have quite a few joins in my plan that have

leftInputSpec=[NoUniqueKey]

in Flink UI. I know this can't truly be the case that there is no unique
key, at least for some of these joins that I've evaluated.

Is there a way to hint to the join what the unique key is for a table?

Thanks!

-- 

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com  |  BLOG 
 |  FOLLOW
US   |  LIKE US



Flink on YARN: delegation token expired prevent job restart

2020-11-16 Thread Kien Truong
Hi all,

We are having an issue where Flink Application Master is unable to
automatically restart Flink job after its delegation token has expired.

We are using Flink 1.11 with YARN 3.1.1 in single job per yarn-cluster
mode. We have also add valid keytab configuration and taskmanagers are able
to login with keytabs correctly. However, it seems YARN Application Master
still use delegation tokens instead of the keytab.

Any idea how to resolve this would be much appreciated.

Thanks
Kien


How to convert Int to Date

2020-11-16 Thread Rex Fenley
Hello,

I'm using the Table API and I have a column which is an integer day since
epoch. According to the docs [1] both `int` and `java.lang.Integer` are
acceptable for DATE. However, if I try to use the SQL API to write a DATE
out to the Elasticsearch connector for the INT column I receive an
exception. How then should I go about converting to DATE?

Exception:
Caused by: org.apache.flink.table.api.ValidationException: Field types of
query result and registered TableSink
default_catalog.default_database.sink_es_people do not match.
Query schema: [... column: INT, ...]
Sink schema: [... column: DATE, ...]

I know this column is the culprit because when I make it INT on both ends
it works.

How do I go about making my INT a DATE?

Thanks!

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/types.html#date-and-time

-- 

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com  |  BLOG 
 |  FOLLOW
US   |  LIKE US



Re: Flink State Processor API - Bootstrap One state

2020-11-16 Thread Tzu-Li (Gordon) Tai
Hi,

Using the State Processor API, modifying the state in an existing savepoint
results in a new savepoint (new directory) with the new modified state.
The original savepoint remains intact.
The API allows you to only touch certain operators, without having to touch
any other state and have them remain as is.

Note that when generating a new savepoint from an existing savepoint, the
State Processor API does not perform a deep copy for untouched state.
Instead, the new savepoint will contain references to the old savepoint for
these untouched state.
Essentially these means that modified savepoints written by the State
Processor API are not self-contained, and you should be careful not to
delete the original savepoints as that would invalidate the generated new
one.

Cheers,
Gordon


On Tue, Nov 17, 2020 at 2:19 PM ApoorvK 
wrote:

> Currently my flink application has state size of 160GB(around 50
> operators),
> where few state operator size is much higher, I am planning to use state
> processor API to bootstrap let say one particular state having operator id
> o1 and inside is a ValueState s1 as ID.
>
> Following steps I have planned to do it :
>
> 1. If I take a savepoint of the application I will be having a state folder
> containing operator and meta data file having o1 and s1.
>
> 2. Now II read only that state using state processor API (another flink
> app)and re-write it with the data that I want with same o1 and s1 ids and
> copy paste this folder to the savepoint folder taken in step 1.
>
> 3. Restore the application with the savepoint taken in step 1.
>
> Doing so as I do not want to touch any other state , I have my concerned
> with a particular state operator.
>
> Team, Kindly let me know if this is the right way to do it, or is there any
> better way using which I can achieve this.
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


Flink State Processor API - Bootstrap One state

2020-11-16 Thread ApoorvK
Currently my flink application has state size of 160GB(around 50 operators),
where few state operator size is much higher, I am planning to use state
processor API to bootstrap let say one particular state having operator id
o1 and inside is a ValueState s1 as ID.

Following steps I have planned to do it :

1. If I take a savepoint of the application I will be having a state folder
containing operator and meta data file having o1 and s1.

2. Now II read only that state using state processor API (another flink
app)and re-write it with the data that I want with same o1 and s1 ids and
copy paste this folder to the savepoint folder taken in step 1.

3. Restore the application with the savepoint taken in step 1.

Doing so as I do not want to touch any other state , I have my concerned
with a particular state operator.

Team, Kindly let me know if this is the right way to do it, or is there any
better way using which I can achieve this.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: split avro kafka field

2020-11-16 Thread Tzu-Li (Gordon) Tai
Hi,

1. You'd have to configure your Kafka connector source to use a
DeserializationSchema that deserializes the Kafka record byte to your
generated Avro type. You can use the shipped `AvroDeserializationSchema` for
that.

2. After your Kafka connector source, you can use a flatMap transformation
to do the splitting.

Cheers,
Gordon



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Kafka SQL table Re-partition via Flink SQL

2020-11-16 Thread Tzu-Li (Gordon) Tai
Hi,

I'm pulling in some Flink SQL experts (in CC) to help you with this one :)

Cheers,
Gordon

On Tue, Nov 17, 2020 at 7:30 AM Slim Bouguerra 
wrote:

> Hi,
> I am trying to author a SQL job that does repartitioning a Kafka SQL table
> into another Kafka SQL table.
> as example input/output tables have exactly the same SQL schema (see
> below) and data the only difference is that the new kafka stream need to be
> repartition using a simple project like item_id (input stream is
> partitioned by user_id)
> is there a way to do this via SQL only ? without using
> org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner
>
> In other words how can we express the stream key (keyedBy) via the SQL
> layer ?
>
> For instance in Hive they expose a system column called  __key or
> __partition that can be used to do this via SQL layer  (see
> https://github.com/apache/hive/tree/master/kafka-handler#table-definitions
> )
>
> CREATE TABLE input_kafkaTable (
>  user_id BIGINT,
>  item_id BIGINT,
>  category_id BIGINT,
>  behavior STRING,
>  ts TIMESTAMP(3)
> ) WITH (
>  'connector' = 'kafka',
>  'topic' = 'user_behavior_partition_by_uid',
>  'properties.bootstrap.servers' = 'localhost:9092',
> )
>
> CREATE TABLE output_kafkaTable (
>  user_id BIGINT,
>  item_id BIGINT,
>  category_id BIGINT,
>  behavior STRING,
>  ts TIMESTAMP(3)
> ) WITH (
>  'connector' = 'kafka',
>  'topic' = 'user_behavior_partition_by_iid',
>  'properties.bootstrap.servers' = 'localhost:9092',
> )
>
>
>
> --
>
> B-Slim
> ___/\/\/\___/\/\/\___/\/\/\___/\/\/\___/\/\/\___
>


Re: Flink 1.10 -> Savepoints referencing to checkpoints or not

2020-11-16 Thread Tzu-Li (Gordon) Tai
Hi,

Both the data and metadata is being stored in the savepoint directory, since
Flink 1.3.
The metadata in the savepoint directory does not reference and checkpoint
data files.

In 1.11, what was changed was that the savepoint metadata uses relative
paths to point to the data files in the savepoint directory, instead of
absolute paths.
This would allow you to treat the savepoint directories as self-contained
and free to relocate with filesystem operations outside of Flink.

Hope this clarifies things for you!

Cheers,
Gordon



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Flink 1.11.2 could not create kafka table source on EMR.

2020-11-16 Thread Fanbin Bu
Hi,

I could not launch my flink 1.11.2 application on EMR with exception

Caused by: org.apache.flink.table.api.ValidationException:
Could not find any factory for identifier 'kafka' that implements
'org.apache.flink.table.factories.DynamicTableSourceFactory' in the
classpath.

I attached the full log at the end. After checking some other threads and
none applies in my case. here is my observation:

1. dependency check: both flink-connector-kafka and flink-json are included
in the final fat jar.
2.
resources/META-INF/services/org.apache.flink.table.factories.TableFactory
has the following and is included in the final fat jar.
  - org.apache.flink.formats.json.JsonRowFormatFactory
  - org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactory
  also noticed that only identifier datagen is shown in the log. No
kafka or json in there.
3. local IntelliJ running fine.
4. same jar on EMR not working

Please advise.
Thanks,
Fanbin




Caused by: org.apache.flink.table.api.ValidationException: Unable to create
a source for reading table
'default_catalog.default_database.analytics_service'.

Table options are:

'connector'='kafka'
'format'='json'
'json.ignore-parse-errors'='true'
'properties.bootstrap.servers'='localhost:9093'
'properties.group.id'='xxx'
'properties.security.protocol'='SSL'
'properties.ssl.enabled.protocols'='TLSv1.2,TLSv1.1,TLSv1'
'properties.ssl.key.password'='secret'
'properties.ssl.keystore.location'='xxx.jks'
'properties.ssl.keystore.password'='secret'
'properties.ssl.keystore.type'='JKS'
'properties.ssl.truststore.location'='xxx.jks'
'properties.ssl.truststore.password'='secret'
'properties.ssl.truststore.type'='JKS'
'properties.zookeeper.connect'='localhost:2181'
'scan.startup.mode'='earliest-offset'
'topic'='events'
at
org.apache.flink.table.factories.FactoryUtil.createTableSource(FactoryUtil.java:125)
at
org.apache.flink.table.planner.plan.schema.CatalogSourceTable.buildTableScan(CatalogSourceTable.scala:140)
at
org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.scala:78)
at
org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:3492)
at
org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2415)
at
org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2102)
at
org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2051)
at
org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:661)
at
org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:642)
at
org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3345)
at
org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2178)
at
org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2051)
at
org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:661)
at
org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:642)
at
org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3345)
at
org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:568)
at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org
$apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:164)
at
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:151)
at
org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:789)
at
org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:761)
at
org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:238)
at
org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:78)
at
org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:664)
at com.coinbase.ml.FeatureStoreJob.runSqlQuery(FeatureStoreJob.scala:133)
at com.coinbase.ml.FeatureStoreJob.run(FeatureStoreJob.scala:36)
at com.coinbase.ml.RunFlinkJob$.runFlinkJob(RunFlinkJob.scala:30)
at
com.coinbase.ml.FlinkFeatureProcessingJobEntryPoint$.main(CmdLineParser.scala:76)
at
com.coinbase.ml.FlinkFeatureProcessingJobEntryPoint.main(CmdLineParser.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288)
... 11 more
Caused by: org.apache.flink.table.api.ValidationException: Cannot discover
a connector using option ''connector'='kafka''.
at
org.apache.flink.table.factories.FactoryUtil.getDynamicTableFactory(FactoryUti

Re: Flink 1.10 -> Savepoints referencing to checkpoints or not

2020-11-16 Thread Congxian Qiu
Hi  Bajaj
 Savepoint does contain the metadata and data in Flink 1.10, it does
not need to reference any checkpoint data.
Best,
Congxian


Bajaj, Abhinav  于2020年11月17日周二 上午8:58写道:

> Hi,
>
>
>
> I am trying to understand the Flink 1.10 savepoints related documentation
> 
> that mentions -
>
> *“**When triggering a savepoint, a new savepoint directory is created
> where the data as well as the meta data will be stored.**”*
>
>
>
> However, I came across this issue -
> https://issues.apache.org/jira/browse/FLINK-5763 which mentions the
> problem -
>
> *“**For file system based checkpoints (FsStateBackend,
> RocksDBStateBackend) this results in the savepoint referencing files from
> the checkpoint directory (usually different than )**”*
>
>
>
> My understanding of the issue is that savepoints were not storing the data
> and were pointing to checkpoints for actual state data.
>
> The issue has been fixed and changes made into Flink 1.11.
>
>
>
> But, I would like to get the behavior for Flink 1.10 confirmed.
>
>
>
> Does savepoint in Flink 1.10 store both the data & metadata? OR it has
> references to checkpoints & need access to checkpoints directory to restore
> state?
>
>
>
> Thanks much,
>
> Abhinav Bajaj
>


Re: Flink Job Manager Memory Usage Keeps on growing when enabled checkpoint

2020-11-16 Thread Eleanore Jin
Hi Till,

Thanks for the response! The metrics I got from cadvisor and visualized via
dashboard shipped by kubernetes. I actually run the flink job for the past
2 weeks and the memory usage has been stabilized. There is no issue so far.
I still could not figure out the mystery why it was trending up initially.

Thanks a lot for the help!
Eleanoree

On Fri, Nov 13, 2020 at 7:01 AM Till Rohrmann  wrote:

> Hi Eleanore,
>
> sorry for my late reply. The heap dump you have sent does not look
> problematic. How do you measure the pod memory usage exactly? If you start
> the Flink process with -Xms5120m -Xmx5120m then Flink should allocate 5120
> MB of heap memory. Hence, this should be exactly what you are seeing in
> your memory usage graph. This should actually happen independent of the
> checkpointing.
>
> Maybe you can also share the debug logs with us. Maybe they contain some
> more information.
>
> Cheers,
> Till
>
> On Sat, Oct 24, 2020 at 12:24 AM Eleanore Jin 
> wrote:
>
>> I also tried enable native memory tracking, via jcmd, here is the memory
>> breakdown: https://ibb.co/ssrZB4F
>>
>> since job manager memory configuration for flink 1.10.2 only has
>> jobmanager.heap.size, and it only translates to heap settings, should I
>> also set -XX:MaxDirectMemorySize and -XX:MaxMetaspaceSize for job
>> manager? And any recommendations?
>>
>> Thanks a lot!
>> Eleanore
>>
>> On Fri, Oct 23, 2020 at 9:28 AM Eleanore Jin 
>> wrote:
>>
>>> Hi Till,
>>>
>>> please see the screenshot of heap dump: https://ibb.co/92Hzrpr
>>>
>>> Thanks!
>>> Eleanore
>>>
>>> On Fri, Oct 23, 2020 at 9:25 AM Eleanore Jin 
>>> wrote:
>>>
 Hi Till,
 Thanks a lot for the prompt response, please see below information.

 1. how much memory assign to JM pod?
 6g for container memory limit, 5g for jobmanager.heap.size, I think
 this is the only available jm memory configuration for flink 1.10.2

 2. Have you tried with newer Flink versions?
 I am actually using Apache Beam, so the latest version they support for
 Flink is 1.10

 3. What statebackend is used?
 FsStateBackend, and the checkpoint size is around 12MB from checkpoint
 metrics, so I think it is not get inlined

 4. What is state.checkpoints.num-retained?
 I did not configure this explicitly, so by default only 1 should be
 retained

 5. Anything suspicious from JM log?
 There is no Exception nor Error, the only thing I see is the below logs
 keeps on repeating

 {"@timestamp":"2020-10-23T16:05:20.350Z","@version":"1","message":"Disabling
 threads for Delete operation as thread count 0 is <=
 1","logger_name":"org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.azure.AzureFileSystemThreadPoolExecutor","thread_name":"jobmanager-future-thread-4","level":"WARN","level_value":3}

 6. JVM args obtained vis jcmd

 -Xms5120m -Xmx5120m -XX:MaxGCPauseMillis=20
 -XX:-OmitStackTraceInFastThrow


 7. Heap info returned by jcmd  GC.heap_info

 it suggested only about 1G of the heap is used

 garbage-first heap   total 5242880K, used 1123073K
 [0x0006c000, 0x0008)

   region size 2048K, 117 young (239616K), 15 survivors (30720K)

  Metaspace   used 108072K, capacity 110544K, committed 110720K,
 reserved 1146880K

   class spaceused 12963K, capacity 13875K, committed 13952K,
 reserved 1048576K


 8. top -p 

 it suggested for flink job manager java process 4.8G of physical memory
 is consumed

 PID USER  PR  NIVIRTRESSHR S  %CPU %MEM TIME+
 COMMAND


 1 root  20   0 13.356g 4.802g  22676 S   6.0  7.6  37:48.62
 java



 Thanks a lot!
 Eleanore


 On Fri, Oct 23, 2020 at 4:19 AM Till Rohrmann 
 wrote:

> Hi Eleanore,
>
> how much memory did you assign to the JM pod? Maybe the limit is so
> high that it takes a bit of time until GC is triggered. Have you tried
> whether the same problem also occurs with newer Flink versions?
>
> The difference between checkpoints enabled and disabled is that the JM
> needs to do a bit more bookkeeping in order to track the completed
> checkpoints. If you are using the HeapStateBackend, then all states 
> smaller
> than state.backend.fs.memory-threshold will get inlined, meaning that they
> are sent to the JM and stored in the checkpoint meta file. This can
> increase the memory usage of the JM process. Depending on
> state.checkpoints.num-retained this can grow as large as number retained
> checkpoints times the checkpoint size. However, I doubt that this adds up
> to several GB of additional space.
>
> In order to better understand the problem, the debug logs of your JM
> could be helpful. Also a heap dump might be able to point us towards the
> compo

Re: [DISCUSS] Removing deprecated methods from DataStream API

2020-11-16 Thread Hector He
May I have a ask about deprecating readFileStream(...), is there a
alternative to this method? Source code lead me to use readFile instead, but
it does not perform as readFileStream, readFileStream can reads file content
incrementally, but readFile with FileProcessingMode.PROCESS_CONTINUOUSLY
argument reads all file conent every time when the content changes. So why
will Flink make readFileStream to be deprecated but without a better
alternative?

>From the description of official document below link,
FileProcessingMode.PROCESS_CONTINUOUSLY will break the “exactly-once”
semantics.

https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/datastream_api.html#data-sources



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Flink AutoScaling EMR

2020-11-16 Thread Rex Fenley
Thanks for all the input!

On Sun, Nov 15, 2020 at 6:59 PM Xintong Song  wrote:

> Is there a way to make the new yarn job only on the new hardware?
>
> I think you can simply decommission the nodes from Yarn, so that new
> containers will not be allocated from those nodes. You might also need a
> large decommission timeout, upon which all the remaining running contains
> on the decommissioning node will be killed.
>
> Thank you~
>
> Xintong Song
>
>
>
> On Fri, Nov 13, 2020 at 2:57 PM Robert Metzger 
> wrote:
>
>> Hi,
>> it seems that YARN has a feature for targeting specific hardware:
>> https://hadoop.apache.org/docs/r3.1.0/hadoop-yarn/hadoop-yarn-site/PlacementConstraints.html
>> In any case, you'll need enough spare resources for some time to be able
>> to run your job twice for this kind of "zero downtime handover"
>>
>> On Thu, Nov 12, 2020 at 6:10 PM Rex Fenley  wrote:
>>
>>> Awesome, thanks! Is there a way to make the new yarn job only on the new
>>> hardware? Or would the two jobs have to run on intersecting hardware and
>>> then would be switched on/off, which means we'll need a buffer of resources
>>> for our orchestration?
>>>
>>> Also, good point on recovery. I'll spend some time looking into this.
>>>
>>> Thanks
>>>
>>>
>>> On Wed, Nov 11, 2020 at 11:53 PM Robert Metzger 
>>> wrote:
>>>
 Hey Rex,

 the second approach (spinning up a standby job and then doing a
 handover) sounds more promising to implement, without rewriting half of the
 Flink codebase ;)
 What you need is a tool that orchestrates creating a savepoint,
 starting a second job from the savepoint and then communicating with a
 custom sink implementation that can be switched on/off in the two jobs.
 With that approach, you should have almost no downtime, just increased
 resource requirements during such a handover.

 What you need to consider as well is that this handover process only
 works for scheduled maintenance. If you have a system failure, you'll have
 downtime until the last checkpoint is restored.
 If you are trying to reduce the potential downtime overall, I would
 rather recommend optimizing the checkpoint restore time, as this can cover
 both scheduled maintenance and system failures.

 Best,
 Robert





 On Wed, Nov 11, 2020 at 8:56 PM Rex Fenley  wrote:

> Another thought, would it be possible to
> * Spin up new core or task nodes.
> * Run a new copy of the same job on these new nodes from a savepoint.
> * Have the new job *not* write to the sink until the other job is
> torn down?
>
> This would allow us to be eventually consistent and maintain writes
> going through without downtime. As long as whatever is buffering the sink
> doesn't run out of space it should work just fine.
>
> We're hoping to achieve consistency in less than 30s ideally.
>
> Again though, if we could get savepoints to restore in less than 30s
> that would likely be sufficient for our purposes.
>
> Thanks!
>
> On Wed, Nov 11, 2020 at 11:42 AM Rex Fenley  wrote:
>
>> Hello,
>>
>> I'm trying to find a solution for auto scaling our Flink EMR cluster
>> with 0 downtime using RocksDB as state storage and S3 backing store.
>>
>> My current thoughts are like so:
>> * Scaling an Operator dynamically would require all keyed state to be
>> available to the set of subtasks for that operator, therefore a set of
>> subtasks must be reading to and writing from the same RocksDB. I.e. to
>> scale in and out subtasks in that set, they need to read from the same
>> Rocks.
>> * Since subtasks can run on different core nodes, is it possible to
>> have different core nodes read/write to the same RocksDB?
>> * When's the safe point to scale in and out an operator? Only right
>> after a checkpoint possibly?
>>
>> If the above is not possible then we'll have to use save points which
>> means some downtime, therefore:
>> * Scaling out during high traffic is arguably more important to react
>> quickly to than scaling in during low traffic. Is it possible to add more
>> core nodes to EMR without disturbing a job? If so then maybe we can
>> orchestrate running a new job on new nodes and then loading a savepoint
>> from a currently running job.
>>
>> Lastly
>> * Save Points for ~70Gib of data take on the order of minutes to tens
>> of minutes for us to restore from, is there any way to speed up 
>> restoration?
>>
>> Thanks!
>>
>> --
>>
>> Rex Fenley  |  Software Engineer - Mobile and Backend
>>
>>
>> Remind.com  |  BLOG
>>   |  FOLLOW US
>>   |  LIKE US
>> 
>>
>
>
> --
>
> Rex Fenley  |  Softwar

Flink 1.10 -> Savepoints referencing to checkpoints or not

2020-11-16 Thread Bajaj, Abhinav
Hi,

I am trying to understand the Flink 1.10 savepoints related 
documentation
 that mentions -
“When triggering a savepoint, a new savepoint directory is created where the 
data as well as the meta data will be stored.”

However, I came across this issue - 
https://issues.apache.org/jira/browse/FLINK-5763 which mentions the problem -
“For file system based checkpoints (FsStateBackend, RocksDBStateBackend) this 
results in the savepoint referencing files from the checkpoint directory 
(usually different than )”

My understanding of the issue is that savepoints were not storing the data and 
were pointing to checkpoints for actual state data.
The issue has been fixed and changes made into Flink 1.11.

But, I would like to get the behavior for Flink 1.10 confirmed.

Does savepoint in Flink 1.10 store both the data & metadata? OR it has 
references to checkpoints & need access to checkpoints directory to restore 
state?

Thanks much,
Abhinav Bajaj


Re: left join flink stream

2020-11-16 Thread Guowei Ma
Hi, Youzha

In general `CoGroup` is for the window based operation. How it could
satisfy your requirements depends on  your specific scenario. But if you
want to look at the mysql table as a dimension table. There might be other
two ways:
1. Using Table/Sql SDK. You could find a sql example(temporal join the JDBC
table as a dimension table) in the table jdbc connector [1] and more join
information in the [2]
2. Using DataStream SDK. Maybe you could see whether the `AsycIO` function
could satisfy your requirements. You could find the example in [3].


[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/jdbc.html#how-to-create-a-jdbc-table
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/streaming/joins.html
[3]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/stream/operators/asyncio.html

Best,
Guowei


On Mon, Nov 16, 2020 at 11:20 PM Youzha  wrote:

> Hi i want to do join reference between kafka with mysql table reference.
> how can i do this thing with flink stream. does coGroup function can handle
> this ? or anyone have java sample code with this case? i’ve read some
> article that said if cogroup function can do left outer join. but i’m still
> confuse to implement it because  i just learned  flink stream.
>
>
> need advice pls.
>


Re: Random Task executor shutdown

2020-11-16 Thread Guowei Ma
Hi, Arnaud
Would you like to share the log of the shutdown task executor?
BTW could you check the gc log of the task executor?
Best,
Guowei


On Mon, Nov 16, 2020 at 8:57 PM LINZ, Arnaud 
wrote:

> (reposted with proper subject line -- sorry for the copy/paste)
> -Original message-
> Hello,
>
> I'm running Flink 1.10 on a yarn cluster. I have a streaming application,
> that, when under heavy load, fails from time to time with this unique error
> message in the whole yarn log:
>
> (...)
> 2020-11-15 16:18:42,202 WARN
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Received
> late message for now expired checkpoint attempt 63 from task
> 4cbc940112a596db54568b24f9209aac of job 1e1717d19bd8ea296314077e42e1c7e5 at
> container_e38_1604477334666_0960_01_04 @ xxx (dataPort=33099).
> 2020-11-15 16:18:55,043 INFO  org.apache.flink.yarn.YarnResourceManager
>  - Closing TaskExecutor connection
> container_e38_1604477334666_0960_01_04 because: The TaskExecutor is
> shutting down.
> 2020-11-15 16:18:55,087 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph- Map (7/15)
> (c8e92cacddcd4e41f51a2433d07d2153) switched from RUNNING to FAILED.
> org.apache.flink.util.FlinkException: The TaskExecutor is shutting down.
>
>   at
> org.apache.flink.runtime.taskexecutor.TaskExecutor.onStop(TaskExecutor.java:359)
> at
> org.apache.flink.runtime.rpc.RpcEndpoint.internalCallOnStop(RpcEndpoint.java:218)
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor$StartedState.terminate(AkkaRpcActor.java:509)
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleControlMessage(AkkaRpcActor.java:175)
> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
> at
> scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
> at akka.japi.pf
> .UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
> at
> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
> at
> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
> at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
> at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
> at akka.actor.ActorCell.invoke(ActorCell.scala:561)
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
> at akka.dispatch.Mailbox.run(Mailbox.scala:225)
> at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
> at
> akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at
> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> at
> akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> at
> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> 2020-11-15 16:18:55,092 INFO
> org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionStrategy
> - Calculating tasks to restart to recover the failed task
> 2f6467d98899e64a4721f0a7b6a059a8_6.
> 2020-11-15 16:18:55,101 INFO
> org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionStrategy
> - 230 tasks should be restarted to recover the failed task
> 2f6467d98899e64a4721f0a7b6a059a8_6.
> (...)
>
> What could be the cause of this failure? Why is there no other error
> message?
>
> I've tried to increase the value of heartbeat.timeout, thinking that maybe
> it was due to a slow responding mapper, but it did not solve the issue.
>
> Best regards,
> Arnaud
>
> 
>
> L'intégrité de ce message n'étant pas assurée sur internet, la société
> expéditrice ne peut être tenue responsable de son contenu ni de ses pièces
> jointes. Toute utilisation ou diffusion non autorisée est interdite. Si
> vous n'êtes pas destinataire de ce message, merci de le détruire et
> d'avertir l'expéditeur.
>
> The integrity of this message cannot be guaranteed on the Internet. The
> company that sent this message cannot therefore be held liable for its
> content nor attachments. Any unauthorized use or dissemination is
> prohibited. If you are not the intended recipient of this message, then
> please delete it and notify the sender.
>


Kafka SQL table Re-partition via Flink SQL

2020-11-16 Thread Slim Bouguerra
Hi,
I am trying to author a SQL job that does repartitioning a Kafka SQL table
into another Kafka SQL table.
as example input/output tables have exactly the same SQL schema (see below)
and data the only difference is that the new kafka stream need to be
repartition using a simple project like item_id (input stream is
partitioned by user_id)
is there a way to do this via SQL only ? without using
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner

In other words how can we express the stream key (keyedBy) via the SQL
layer ?

For instance in Hive they expose a system column called  __key or
__partition that can be used to do this via SQL layer  (see
https://github.com/apache/hive/tree/master/kafka-handler#table-definitions)

CREATE TABLE input_kafkaTable (
 user_id BIGINT,
 item_id BIGINT,
 category_id BIGINT,
 behavior STRING,
 ts TIMESTAMP(3)
) WITH (
 'connector' = 'kafka',
 'topic' = 'user_behavior_partition_by_uid',
 'properties.bootstrap.servers' = 'localhost:9092',
)

CREATE TABLE output_kafkaTable (
 user_id BIGINT,
 item_id BIGINT,
 category_id BIGINT,
 behavior STRING,
 ts TIMESTAMP(3)
) WITH (
 'connector' = 'kafka',
 'topic' = 'user_behavior_partition_by_iid',
 'properties.bootstrap.servers' = 'localhost:9092',
)



-- 

B-Slim
___/\/\/\___/\/\/\___/\/\/\___/\/\/\___/\/\/\___


Re: [External Sender] Re: Random Task executor shutdown (java.lang.OutOfMemoryError: Metaspace)

2020-11-16 Thread Flavio Pompermaier
Thank you Kye for your insights...in my mind, if the job runs without
problems one or more times the heap size, and thus the medatadata-size, is
big enough and I should not increase it (on the same data of course).
So I'll try to understand who is leaking what..the advice to avoid the
dynamic class loading is just a workaround to me..there's something wrong
going on and tomorrow I'll try to understand the root cause of the
problem using -XX:NativeMemoryTracking=summary as you suggested.

I'll keep you up to date with my findings..

Best,
Flavio

On Mon, Nov 16, 2020 at 8:22 PM Kye Bae  wrote:

> Hello!
>
> The JVM metaspace is where all the classes (not class instances or
> objects) get loaded. jmap -histo is going to show you the heap space usage
> info not the metaspace.
>
> You could inspect what is happening in the metaspace by using jcmd (e.g.,
> jcmd JPID VM.native_memory summary) after restarting the cluster with "
> *-XX:NativeMemoryTracking=summary"*
>
> *As the error message suggests, you may need to increase 
> *taskmanager.memory.jvm-metaspace.size,
> but you need to be slightly careful when specifying the memory parameters
> in flink-conf.yaml in Flink 1.10 (they have an issue with a confusing error
> message).
>
> Anothing thing to keep in mind is that you may want to avoid using dynamic
> classloading (
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/monitoring/debugging_classloading.html#avoiding-dynamic-classloading-for-user-code):
> when the job continuously fails for some temporary issues, it will load the
> same class files into the metaspace multiple times and could exceed
> whatever the limit you set it.
>
> -K
>
> On Mon, Nov 16, 2020 at 12:39 PM Jan Lukavský  wrote:
>
>> The exclusions should not have any impact on that, because what defines
>> which classloader will load which class is not the presence or particular
>> class in a specific jar, but the configuration of parent-first-patterns [1].
>>
>> If you don't use any flink internal imports, than it still might be the
>> case, that a class in any of the packages defined by the
>> parent-first-pattern to hold reference to your user-code classes, which
>> would cause the leak. I'd recommend to inspect the heap dump after several
>> restarts of the application and look for reference to Class objects from
>> the root set.
>>
>> Jan
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html#class-loading
>> 
>> On 11/16/20 5:34 PM, Flavio Pompermaier wrote:
>>
>> I've tried to remove all possible imports of classes not contained in the
>> fat jar but I still face the same problem.
>> I've also tried to reduce as much as possible the exclude in the shade
>> section of the maven plugin (I took the one at [1]) so now I exclude only
>> few dependencies..could it be that I should include org.slf4j:* if I use
>> static import of it?
>>
>> 
>> 
>>   com.google.code.findbugs:jsr305
>>   org.slf4j:*
>>   log4j:*
>> 
>> 
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-master/dev/project-configuration.html#appendix-template-for-building-a-jar-with-dependencies
>> 
>>
>> On Mon, Nov 16, 2020 at 3:29 PM Jan Lukavský  wrote:
>>
>>> Yes, that could definitely cause this. You should probably avoid using
>>> these flink-internal shaded classes and ship your own versions (not shaded).
>>>
>>> Best,
>>>
>>>  Jan
>>> On 11/16/20 3:22 PM, Flavio Pompermaier wrote:
>>>
>>> Thank you Jan for your valuable feedback.
>>> Could it be that I should not use import shaded-jackson classes in my
>>> user code?
>>> For example import
>>> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper?
>>>
>>> Bets,
>>> Flavio
>>>
>>> On Mon, Nov 16, 2020 at 3:15 PM Jan Lukavský  wrote:
>>>
 Hi Flavio,

 when I encountered quite similar problem that you describe, it was
 related to a static storage located in class that was loaded
 "parent-first". In my case it was it was in java.lang.ClassValue, but it
 might (and probably will be) different in your case. The problem is that if
 user-code registers something in some (static) storage located in class
 loaded with parent (TaskTracker) classloader, then its associated classes
 will never be GC'd and Metaspace will grow. A good starting point would be
 not to focus on biggest consumers of heap (in general), but to look at
 where the 15k objects of type Class are referenced from. That might help
 you figure this out. I'm not sure if there is

Re: [External Sender] Re: Random Task executor shutdown (java.lang.OutOfMemoryError: Metaspace)

2020-11-16 Thread Kye Bae
Hello!

The JVM metaspace is where all the classes (not class instances or objects)
get loaded. jmap -histo is going to show you the heap space usage info not
the metaspace.

You could inspect what is happening in the metaspace by using jcmd (e.g.,
jcmd JPID VM.native_memory summary) after restarting the cluster with "
*-XX:NativeMemoryTracking=summary"*

*As the error message suggests, you may need to increase
*taskmanager.memory.jvm-metaspace.size,
but you need to be slightly careful when specifying the memory parameters
in flink-conf.yaml in Flink 1.10 (they have an issue with a confusing error
message).

Anothing thing to keep in mind is that you may want to avoid using dynamic
classloading (
https://ci.apache.org/projects/flink/flink-docs-release-1.10/monitoring/debugging_classloading.html#avoiding-dynamic-classloading-for-user-code):
when the job continuously fails for some temporary issues, it will load the
same class files into the metaspace multiple times and could exceed
whatever the limit you set it.

-K

On Mon, Nov 16, 2020 at 12:39 PM Jan Lukavský  wrote:

> The exclusions should not have any impact on that, because what defines
> which classloader will load which class is not the presence or particular
> class in a specific jar, but the configuration of parent-first-patterns [1].
>
> If you don't use any flink internal imports, than it still might be the
> case, that a class in any of the packages defined by the
> parent-first-pattern to hold reference to your user-code classes, which
> would cause the leak. I'd recommend to inspect the heap dump after several
> restarts of the application and look for reference to Class objects from
> the root set.
>
> Jan
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html#class-loading
> 
> On 11/16/20 5:34 PM, Flavio Pompermaier wrote:
>
> I've tried to remove all possible imports of classes not contained in the
> fat jar but I still face the same problem.
> I've also tried to reduce as much as possible the exclude in the shade
> section of the maven plugin (I took the one at [1]) so now I exclude only
> few dependencies..could it be that I should include org.slf4j:* if I use
> static import of it?
>
> 
> 
>   com.google.code.findbugs:jsr305
>   org.slf4j:*
>   log4j:*
> 
> 
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-master/dev/project-configuration.html#appendix-template-for-building-a-jar-with-dependencies
> 
>
> On Mon, Nov 16, 2020 at 3:29 PM Jan Lukavský  wrote:
>
>> Yes, that could definitely cause this. You should probably avoid using
>> these flink-internal shaded classes and ship your own versions (not shaded).
>>
>> Best,
>>
>>  Jan
>> On 11/16/20 3:22 PM, Flavio Pompermaier wrote:
>>
>> Thank you Jan for your valuable feedback.
>> Could it be that I should not use import shaded-jackson classes in my
>> user code?
>> For example import
>> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper?
>>
>> Bets,
>> Flavio
>>
>> On Mon, Nov 16, 2020 at 3:15 PM Jan Lukavský  wrote:
>>
>>> Hi Flavio,
>>>
>>> when I encountered quite similar problem that you describe, it was
>>> related to a static storage located in class that was loaded
>>> "parent-first". In my case it was it was in java.lang.ClassValue, but it
>>> might (and probably will be) different in your case. The problem is that if
>>> user-code registers something in some (static) storage located in class
>>> loaded with parent (TaskTracker) classloader, then its associated classes
>>> will never be GC'd and Metaspace will grow. A good starting point would be
>>> not to focus on biggest consumers of heap (in general), but to look at
>>> where the 15k objects of type Class are referenced from. That might help
>>> you figure this out. I'm not sure if there is something that can be done in
>>> general to prevent this type of leaks. That would be probably question on
>>> dev@ mailing list.
>>>
>>> Best,
>>>
>>>  Jan
>>> On 11/16/20 2:27 PM, Flavio Pompermaier wrote:
>>>
>>> Hello everybody,
>>> I was writing this email when a similar thread on this mailing list
>>> appeared..
>>> The difference is that the other problem seems to be related with Flink
>>> 1.10 on YARN and does not output anything helpful in debugging the cause of
>>> the problem.
>>>
>>> Indeed, in my use case I use Flink 1.11.0 and Flink on a standalone
>>> session cluster (the job is submitted to the cluster using the CLI client).
>>> The problem arises when I submit the same job for about 20 times

Re: Random Task executor shutdown (java.lang.OutOfMemoryError: Metaspace)

2020-11-16 Thread Jan Lukavský
The exclusions should not have any impact on that, because what defines 
which classloader will load which class is not the presence or 
particular class in a specific jar, but the configuration of 
parent-first-patterns [1].


If you don't use any flink internal imports, than it still might be the 
case, that a class in any of the packages defined by the 
parent-first-pattern to hold reference to your user-code classes, which 
would cause the leak. I'd recommend to inspect the heap dump after 
several restarts of the application and look for reference to Class 
objects from the root set.


Jan

[1] 
https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html#class-loading


On 11/16/20 5:34 PM, Flavio Pompermaier wrote:
I've tried to remove all possible imports of classes not contained in 
the fat jar but I still face the same problem.
I've also tried to reduce as much as possible the exclude in the shade 
section of the maven plugin (I took the one at [1]) so now I exclude 
only few dependencies..could it be that I should include org.slf4j:* 
if I use static import of it?



    
com.google.code.findbugs:jsr305
      org.slf4j:*
      log4j:*
    


[1] 
https://ci.apache.org/projects/flink/flink-docs-master/dev/project-configuration.html#appendix-template-for-building-a-jar-with-dependencies


On Mon, Nov 16, 2020 at 3:29 PM Jan Lukavský > wrote:


Yes, that could definitely cause this. You should probably avoid
using these flink-internal shaded classes and ship your own
versions (not shaded).

Best,

 Jan

On 11/16/20 3:22 PM, Flavio Pompermaier wrote:

Thank you Jan for your valuable feedback.
Could it be that I should not use import shaded-jackson classes
in my user code?
For example import

org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper?

Bets,
Flavio

On Mon, Nov 16, 2020 at 3:15 PM Jan Lukavský mailto:je...@seznam.cz>> wrote:

Hi Flavio,

when I encountered quite similar problem that you describe,
it was related to a static storage located in class that was
loaded "parent-first". In my case it was it was in
java.lang.ClassValue, but it might (and probably will be)
different in your case. The problem is that if user-code
registers something in some (static) storage located in class
loaded with parent (TaskTracker) classloader, then its
associated classes will never be GC'd and Metaspace will
grow. A good starting point would be not to focus on biggest
consumers of heap (in general), but to look at where the 15k
objects of type Class are referenced from. That might help
you figure this out. I'm not sure if there is something that
can be done in general to prevent this type of leaks. That
would be probably question on dev@ mailing list.

Best,

 Jan

On 11/16/20 2:27 PM, Flavio Pompermaier wrote:

Hello everybody,
I was writing this email when a similar thread on this
mailing list appeared..
The difference is that the other problem seems to be related
with Flink 1.10 on YARN and does not output anything helpful
in debugging the cause of the problem.

Indeed, in my use case I use Flink 1.11.0 and Flink on a
standalone session cluster (the job is submitted to the
cluster using the CLI client).
The problem arises when I submit the same job for about 20
times (this number unfortunately is not deterministic and
can change a little bit). The error reported by the Task
Executor is related to the ever growing Metaspace..the error
seems to be pretty detailed [1].

I found the same issue in some previous threads on this
mailing list and I've tried to figure it out the cause of
the problem. The issue is that looking at the objects
allocated I don't really get an idea of the source of the
problem because the type of objects that are consuming the
memory are of general purpose (i.e. Bytes, Integers and
Strings)...these are my "top" memory consumers if looking at
the output of  jmap -histo :

At run 0:

 num     #instances         #bytes  class name (module)
---
   1:         46238       13224056  [B (java.base@11.0.9.1
)
   2:          3736        6536672  [I (java.base@11.0.9.1
)
   3:         38081         913944  java.lang.String
(java.base@11.0.9.1 )
   4:            26         852384
 [Lakka.dispatch.forkjoin.ForkJoinTask;
   5:          7146         844984  java.lang.Class
(java.base@11.0.9.1 )

At run 1:

 

Re: Random Task executor shutdown (java.lang.OutOfMemoryError: Metaspace)

2020-11-16 Thread Flavio Pompermaier
I've tried to remove all possible imports of classes not contained in the
fat jar but I still face the same problem.
I've also tried to reduce as much as possible the exclude in the shade
section of the maven plugin (I took the one at [1]) so now I exclude only
few dependencies..could it be that I should include org.slf4j:* if I use
static import of it?



  com.google.code.findbugs:jsr305
  org.slf4j:*
  log4j:*



[1]
https://ci.apache.org/projects/flink/flink-docs-master/dev/project-configuration.html#appendix-template-for-building-a-jar-with-dependencies

On Mon, Nov 16, 2020 at 3:29 PM Jan Lukavský  wrote:

> Yes, that could definitely cause this. You should probably avoid using
> these flink-internal shaded classes and ship your own versions (not shaded).
>
> Best,
>
>  Jan
> On 11/16/20 3:22 PM, Flavio Pompermaier wrote:
>
> Thank you Jan for your valuable feedback.
> Could it be that I should not use import shaded-jackson classes in my user
> code?
> For example import
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper?
>
> Bets,
> Flavio
>
> On Mon, Nov 16, 2020 at 3:15 PM Jan Lukavský  wrote:
>
>> Hi Flavio,
>>
>> when I encountered quite similar problem that you describe, it was
>> related to a static storage located in class that was loaded
>> "parent-first". In my case it was it was in java.lang.ClassValue, but it
>> might (and probably will be) different in your case. The problem is that if
>> user-code registers something in some (static) storage located in class
>> loaded with parent (TaskTracker) classloader, then its associated classes
>> will never be GC'd and Metaspace will grow. A good starting point would be
>> not to focus on biggest consumers of heap (in general), but to look at
>> where the 15k objects of type Class are referenced from. That might help
>> you figure this out. I'm not sure if there is something that can be done in
>> general to prevent this type of leaks. That would be probably question on
>> dev@ mailing list.
>>
>> Best,
>>
>>  Jan
>> On 11/16/20 2:27 PM, Flavio Pompermaier wrote:
>>
>> Hello everybody,
>> I was writing this email when a similar thread on this mailing list
>> appeared..
>> The difference is that the other problem seems to be related with Flink
>> 1.10 on YARN and does not output anything helpful in debugging the cause of
>> the problem.
>>
>> Indeed, in my use case I use Flink 1.11.0 and Flink on a standalone
>> session cluster (the job is submitted to the cluster using the CLI client).
>> The problem arises when I submit the same job for about 20 times (this
>> number unfortunately is not deterministic and can change a little bit). The
>> error reported by the Task Executor is related to the ever growing
>> Metaspace..the error seems to be pretty detailed [1].
>>
>> I found the same issue in some previous threads on this mailing list and
>> I've tried to figure it out the cause of the problem. The issue is that
>> looking at the objects allocated I don't really get an idea of the source
>> of the problem because the type of objects that are consuming the memory
>> are of general purpose (i.e. Bytes, Integers and Strings)...these are my
>> "top" memory consumers if looking at the output of  jmap -histo :
>>
>> At run 0:
>>
>>  num #instances #bytes  class name (module)
>> ---
>>1: 46238   13224056  [B (java.base@11.0.9.1)
>>2:  37366536672  [I (java.base@11.0.9.1)
>>3: 38081 913944  java.lang.String (java.base@11.0.9.1)
>>4:26 852384  [Lakka.dispatch.forkjoin.ForkJoinTask;
>>5:  7146 844984  java.lang.Class (java.base@11.0.9.1)
>>
>> At run 1:
>>
>>1: 77.608   25.317.496  [B (java.base@11.0.9.1)
>>2:  7.0049.088.360  [I (java.base@11.0.9.1)
>>3: 15.8141.887.256  java.lang.Class (
>> java.base@11.0.9.1)
>>4: 67.3811.617.144  java.lang.String (
>> java.base@11.0.9.1)
>>5:  3.9061.422.960  [Ljava.util.HashMap$Node; (
>> java.base@11.0.9.1)
>>
>> At run 6:
>>
>>1: 81.408   25.375.400  [B (java.base@11.0.9.1)
>>2: 12.4797.249.392  [I (java.base@11.0.9.1)
>>3: 29.0903.496.168  java.lang.Class (
>> java.base@11.0.9.1)
>>4:  4.3472.813.416  [Ljava.util.HashMap$Node; (
>> java.base@11.0.9.1)
>>5: 71.5841.718.016  java.lang.String (
>> java.base@11.0.9.1)
>>
>> At run 8:
>>
>>1:985.979  127.193.256  [B (java.base@11.0.9.1)
>>2: 35.400   13.702.112  [I (java.base@11.0.9.1)
>>3:260.3876.249.288  java.lang.String (
>> java.base@11.0.9.1)
>>4:148.8365.953.440  java.util.HashMap$KeyIterator (
>> java.base@11.0.9.1)
>>5: 17.6415.222.344  [Ljava.util.HashMap$Node; (
>> java.bas

Re: How to use EventTimeSessionWindows.withDynamicGap()

2020-11-16 Thread Aljoscha Krettek

Hi,

thanks for the pointer, I should have remembered that thread earlier!

I'll try and sketch what the pipeline might look like to show what I 
mean by "enriching the message" and where the operations would sit.


DataStream source = 

DataStream> enriched = source
  .keyBy()
  .map(new StatefulSessionCalculator()); // or process()

DataStream<...> result = enriched
  .keyBy(new MyKeySelector())
  .window(EventTimeSessionWindows.withDynamicGap(
new DynamicWindowGapExtractor()))
  .sideOutputLateData(lateDataSideOutputTag)
  .trigger(ContinuousEventTimeTrigger.of(Time.minutes(10)))
  .process(new ProcessWindowFunction(...));

The stateful map function could look something like this:

Tuple2 map(MyMessageType input) {
  ValueState valueState = getState(myModelStateDescriptor);
  MyState state = valueState.value()
  state.update(input);
  long suggestedGap = state.getSuggestedGap();
  valueState.update(state);
  return Tuple2.of(input, suggestedGap);
}

The two operations have to be separate because the session gap extractor 
cannot be stateful.


I think, however, that it might be easier at this point to just use a 
stateful ProcessFunction to not have to deal with the somewhat finicky 
setup of the stateful extractor just to force it into the requirements 
of the session windows API.


Best,
Aljoscha

On 14.11.20 10:50, Simone Cavallarin wrote:

Hi Aljoscha,

I found a similar question of mine by 
KristoffSC
 Jan, 2020, called Session Windows with dynamic gap.

http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Session-Window-with-dynamic-gap-td31893.html

The idea is the same and at the end of the thread this was the solution that you 
suggested: "There are no plans of adding state support to the gap extractors but you 
could do this using a two-step approach, i.e. have an operation in front of the window 
that keeps track of session gaps, enriches the message with the gap that should be used 
and then the extractor extracts that gap. This is a more modular approach compared to 
putting everything in one operator/extractor."


1) Operation in front of the windows -> keep track of the session gaps (I have 
been reading all around for this)

   *   
(https://ci.apache.org/projects/flink/flink-docs-stable/api/java/index.html?org/apache/flink/streaming/api/windowing/assigners/SessionWindowTimeGapExtractor.html)
   *   
https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SessionWindowTimeGapExtractor.java
   *   
https://www.codota.com/code/java/classes/org.apache.flink.streaming.api.windowing.assigners.SessionWindowTimeGapExtractor


2) Enrich the message with the gap that should be use (this is a parameter can 
be for example an average of the last 10 gaps?)

   *   (I got lost.) How can I enrich a message coming from Kafka, maybe adding 
this parameter to the next event?

3) The extractor extract the gap (that will be used to calculate a new gap 
parameter so it needs to be sent back on point 1 and be used on the windowing 
process)


   *   (m.. okay now complitely lost...)

Thanks
s


From: Simone Cavallarin 
Sent: 13 November 2020 16:55
To: Aljoscha Krettek 
Cc: user 
Subject: Re: How to use EventTimeSessionWindows.withDynamicGap()

+user@


From: Simone Cavallarin 
Sent: 13 November 2020 16:46
To: Aljoscha Krettek 
Subject: Re: How to use EventTimeSessionWindows.withDynamicGap()

Hi Aljoscha,

When you said: You could use a stateful operation (like a ProcessFunction) to put a 
dynamic "gap" into the records and then use that gap with 
EventTimeSessionWindows. I understand the theory but I'm struggling to put in practice in 
code terms.



stream = steam
 .keyBy(new MyKeySelector())
 .window(EventTimeSessionWindows.withDynamicGap(new 
DynamicWindowGapExtractor()))
 .sideOutputLateData(lateDataSideOutputTag)
 .trigger(ContinuousEventTimeTrigger.of(Time.minutes(10))) // in case some 
key is continuously coming within the session window gap
 .process(new ProcessWindowFunction(……));


Where ProcessWindowFunction(……)update a parameter that is used inside 
DynamicWindowGapExtractor()...

I found this on the following link: 
https://stackoverflow.com/questions/61960485/flink-session-window-not-triggered-even-with-continuouseventtimetrigger

If you could help me with some examples where i can read some code it would be 
so helpful.

Thanks!


From: Aljoscha Krettek 
Sent: 13 November 2020 09:43
To: user@flink.apache.org 
Subject: Re: How to use EventTimeSessionWindows.withDynamicGap()

Yes, you're right that Flink can do this with session windows but the
assign

Re: CREATE TABLE LIKE clause from different catalog or database

2020-11-16 Thread 김동원
Hi Ingo, 

Thank you for letting me know! I didn’t know that’s already discussed. 

Best,

Dongwon

> 2020. 11. 17. 오전 1:12, Ingo Bürk  작성:
> 
> 
> Hi,
> 
> I ran into the same issue today. This is fixed in 1.11.3, the corresponding 
> bug was FLINK-19281.
> 
> A workaround is to switch the current catalog and database temporarily to 
> hive.navi and then not qualify the table name in the LIKE clause.
> 
> 
> Regards
> Ingo
> 
> 
>> On Mon, Nov 16, 2020, 17:04 Dongwon Kim  wrote:
>> Hi Danny~
>> Sorry for late reply,
>> 
>> Let's take a look at a running example:
>>> EnvironmentSettings settings = EnvironmentSettings.newInstance()
>>>   .inBatchMode()
>>>   .build();
>>> 
>>> TableEnvironment tEnv = TableEnvironment.create(settings);
>>> 
>>> HiveCatalog hiveCatalog = new HiveCatalog("hive",null, args[1]);
>>> tEnv.registerCatalog("hive", hiveCatalog);
>>> 
>>> GenericInMemoryCatalog inmemCatalog = new GenericInMemoryCatalog("inmem");
>>> tEnv.registerCatalog("inmem", inmemCatalog);
>>> tEnv.useCatalog("inmem");
>>> 
>>> TableResult result = tEnv.executeSql(
>>>   "CREATE TABLE copied LIKE hive.navi.gps"
>>> );
>> 
>> I've got the following log messages:
>>> 00:50:22,157 INFO  org.apache.flink.table.catalog.hive.HiveCatalog  
>>> [] - Setting hive conf dir as /Users/eastcirclek/hive-conf
>>> 00:50:22,503 INFO  org.apache.flink.table.catalog.hive.HiveCatalog  
>>> [] - Created HiveCatalog 'hive'
>>> 00:50:22,515 INFO  hive.metastore   
>>> [] - Trying to connect to metastore with URI thrift://...:9083
>>> 00:50:22,678 INFO  hive.metastore   
>>> [] - Connected to metastore.
>>> 00:50:22,678 INFO  org.apache.flink.table.catalog.hive.HiveCatalog  
>>> [] - Connected to Hive metastore
>>> 00:50:22,799 INFO  org.apache.flink.table.catalog.CatalogManager
>>> [] - Set the current default catalog as [inmem] and the current default 
>>> database as [default].
>>> Exception in thread "main" org.apache.flink.table.api.ValidationException: 
>>> Source table '`inmem`.`default`.`hive.navi.gps`' of the LIKE clause not 
>>> found in the catalog, at line 1, column 26
>>> at 
>>> org.apache.flink.table.planner.operations.SqlCreateTableConverter.lambda$lookupLikeSourceTable$1(SqlCreateTableConverter.java:147)
>>> at java.util.Optional.orElseThrow(Optional.java:290)
>>> at 
>>> org.apache.flink.table.planner.operations.SqlCreateTableConverter.lookupLikeSourceTable(SqlCreateTableConverter.java:147)
>>> at 
>>> org.apache.flink.table.planner.operations.SqlCreateTableConverter.createCatalogTable(SqlCreateTableConverter.java:96)
>>> at 
>>> org.apache.flink.table.planner.operations.SqlCreateTableConverter.convertCreateTable(SqlCreateTableConverter.java:76)
>>> at 
>>> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:190)
>>> at 
>>> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:78)
>>> at 
>>> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:684)
>>> at Test.main(Test.java:53)
>> 
>> It seems like hive.navi.gps is recognized as a table name as a whole. 
>> I currently declare such a table by specifying all fields without the LIKE 
>> clause.
>> 
>> Do I miss something?
>> 
>> FYI, I'm working with Flink-1.11.2.
>> 
>> Thank you~
>> 
>> Best,
>> 
>> Dongwon
>> 
>> 
>>> On Fri, Nov 13, 2020 at 5:19 PM Danny Chan  wrote:
>>> Hi Dongwon ~
>>> 
>>> Table from different catalog/db is supported, you need to specify the full 
>>> path of the source table:
>>> 
>>> CREATE TABLE Orders_with_watermark (
>>> ...
>>> ) WITH (
>>> ...
>>> )
>>> LIKE my_catalog.my_db.Orders;
>>> 
>>> Dongwon Kim  于2020年11月11日周三 下午2:53写道:
 Hi,
 
 Is it disallowed to refer to a table from different databases or catalogs 
 when someone creates a table?
 
 According to [1], there's no way to refer to tables belonging to different 
 databases or catalogs.
 
 [1] 
 https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/create.html#create-table
 
 Best,
 
 Dongwon


Re: CREATE TABLE LIKE clause from different catalog or database

2020-11-16 Thread Dongwon Kim
Hi Danny~
Sorry for late reply,

Let's take a look at a running example:

> EnvironmentSettings settings = EnvironmentSettings.newInstance()
>   .inBatchMode()
>   .build();
>
> TableEnvironment tEnv = TableEnvironment.create(settings);
>
> HiveCatalog hiveCatalog = new HiveCatalog("hive",null, args[1]);
> tEnv.registerCatalog("hive", hiveCatalog);
>
> GenericInMemoryCatalog inmemCatalog = new GenericInMemoryCatalog("inmem");
> tEnv.registerCatalog("inmem", inmemCatalog);
> tEnv.useCatalog("inmem");
>
> TableResult result = tEnv.executeSql(
>   "CREATE TABLE copied LIKE hive.navi.gps"
> );
>

I've got the following log messages:

> 00:50:22,157 INFO  org.apache.flink.table.catalog.hive.HiveCatalog
>  [] - Setting hive conf dir as /Users/eastcirclek/hive-conf
> 00:50:22,503 INFO  org.apache.flink.table.catalog.hive.HiveCatalog
>  [] - Created HiveCatalog 'hive'
> 00:50:22,515 INFO  hive.metastore
>   [] - Trying to connect to metastore with URI thrift://...:9083
> 00:50:22,678 INFO  hive.metastore
>   [] - Connected to metastore.
> 00:50:22,678 INFO  org.apache.flink.table.catalog.hive.HiveCatalog
>  [] - Connected to Hive metastore
> 00:50:22,799 INFO  org.apache.flink.table.catalog.CatalogManager
>  [] - Set the current default catalog as [inmem] and the current
> default database as [default].
> *Exception in thread "main"
> org.apache.flink.table.api.ValidationException: Source table
> '`inmem`.`default`.`hive.navi.gps`' of the LIKE clause not found in the
> catalog, at line 1, column 26*
> at
> org.apache.flink.table.planner.operations.SqlCreateTableConverter.lambda$lookupLikeSourceTable$1(SqlCreateTableConverter.java:147)
> at java.util.Optional.orElseThrow(Optional.java:290)
> at
> org.apache.flink.table.planner.operations.SqlCreateTableConverter.lookupLikeSourceTable(SqlCreateTableConverter.java:147)
> at
> org.apache.flink.table.planner.operations.SqlCreateTableConverter.createCatalogTable(SqlCreateTableConverter.java:96)
> at
> org.apache.flink.table.planner.operations.SqlCreateTableConverter.convertCreateTable(SqlCreateTableConverter.java:76)
> at
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:190)
> at
> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:78)
> at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:684)
> at Test.main(Test.java:53)
>

It seems like hive.navi.gps is recognized as a table name as a whole.
I currently declare such a table by specifying all fields without the LIKE
clause.

Do I miss something?

FYI, I'm working with Flink-1.11.2.

Thank you~

Best,

Dongwon


On Fri, Nov 13, 2020 at 5:19 PM Danny Chan  wrote:

> Hi Dongwon ~
>
> Table from different catalog/db is supported, you need to specify the full
> path of the source table:
>
> CREATE TABLE Orders_with_watermark (
> *...*) WITH (
> *...*)LIKE my_catalog.my_db.Orders;
>
>
> Dongwon Kim  于2020年11月11日周三 下午2:53写道:
>
>> Hi,
>>
>> Is it disallowed to refer to a table from different databases or catalogs
>> when someone creates a table?
>>
>> According to [1], there's no way to refer to tables belonging to
>> different databases or catalogs.
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/create.html#create-table
>>
>> Best,
>>
>> Dongwon
>>
>


left join flink stream

2020-11-16 Thread Youzha
Hi i want to do join reference between kafka with mysql table reference.
how can i do this thing with flink stream. does coGroup function can handle
this ? or anyone have java sample code with this case? i’ve read some
article that said if cogroup function can do left outer join. but i’m still
confuse to implement it because  i just learned  flink stream.


need advice pls.


Re: Random Task executor shutdown (java.lang.OutOfMemoryError: Metaspace)

2020-11-16 Thread Jan Lukavský
Yes, that could definitely cause this. You should probably avoid using 
these flink-internal shaded classes and ship your own versions (not shaded).


Best,

 Jan

On 11/16/20 3:22 PM, Flavio Pompermaier wrote:

Thank you Jan for your valuable feedback.
Could it be that I should not use import shaded-jackson classes in my 
user code?
For example import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper?


Bets,
Flavio

On Mon, Nov 16, 2020 at 3:15 PM Jan Lukavský > wrote:


Hi Flavio,

when I encountered quite similar problem that you describe, it was
related to a static storage located in class that was loaded
"parent-first". In my case it was it was in java.lang.ClassValue,
but it might (and probably will be) different in your case. The
problem is that if user-code registers something in some (static)
storage located in class loaded with parent (TaskTracker)
classloader, then its associated classes will never be GC'd and
Metaspace will grow. A good starting point would be not to focus
on biggest consumers of heap (in general), but to look at where
the 15k objects of type Class are referenced from. That might help
you figure this out. I'm not sure if there is something that can
be done in general to prevent this type of leaks. That would be
probably question on dev@ mailing list.

Best,

 Jan

On 11/16/20 2:27 PM, Flavio Pompermaier wrote:

Hello everybody,
I was writing this email when a similar thread on this mailing
list appeared..
The difference is that the other problem seems to be related
with Flink 1.10 on YARN and does not output anything helpful in
debugging the cause of the problem.

Indeed, in my use case I use Flink 1.11.0 and Flink on a
standalone session cluster (the job is submitted to the cluster
using the CLI client).
The problem arises when I submit the same job for about 20 times
(this number unfortunately is not deterministic and can change a
little bit). The error reported by the Task Executor is related
to the ever growing Metaspace..the error seems to be pretty
detailed [1].

I found the same issue in some previous threads on this mailing
list and I've tried to figure it out the cause of the problem.
The issue is that looking at the objects allocated I don't really
get an idea of the source of the problem because the type of
objects that are consuming the memory are of general purpose
(i.e. Bytes, Integers and Strings)...these are my "top" memory
consumers if looking at the output of  jmap -histo :

At run 0:

 num     #instances         #bytes  class name (module)
---
   1:         46238       13224056  [B (java.base@11.0.9.1
)
   2:          3736        6536672  [I (java.base@11.0.9.1
)
   3:         38081         913944  java.lang.String
(java.base@11.0.9.1 )
   4:            26         852384
 [Lakka.dispatch.forkjoin.ForkJoinTask;
   5:          7146         844984  java.lang.Class
(java.base@11.0.9.1 )

At run 1:

   1:         77.608       25.317.496  [B (java.base@11.0.9.1
)
   2:          7.004        9.088.360  [I (java.base@11.0.9.1
)
   3:         15.814        1.887.256  java.lang.Class
(java.base@11.0.9.1 )
   4:         67.381        1.617.144  java.lang.String
(java.base@11.0.9.1 )
   5:          3.906        1.422.960  [Ljava.util.HashMap$Node;
(java.base@11.0.9.1 )

At run 6:

   1:         81.408       25.375.400  [B (java.base@11.0.9.1
)
   2:         12.479        7.249.392  [I (java.base@11.0.9.1
)
   3:         29.090        3.496.168  java.lang.Class
(java.base@11.0.9.1 )
   4:          4.347        2.813.416  [Ljava.util.HashMap$Node;
(java.base@11.0.9.1 )
   5:         71.584        1.718.016  java.lang.String
(java.base@11.0.9.1 )

At run 8:

   1:        985.979      127.193.256  [B (java.base@11.0.9.1
)
   2:         35.400       13.702.112  [I (java.base@11.0.9.1
)
   3:        260.387        6.249.288  java.lang.String
(java.base@11.0.9.1 )
   4:        148.836        5.953.440
 java.util.HashMap$KeyIterator (java.base@11.0.9.1
)
   5:         17.641        5.222.344  [Ljava.util.HashMap$Node;
(java.base@11.0.9.1 )

Re: Random Task executor shutdown (java.lang.OutOfMemoryError: Metaspace)

2020-11-16 Thread Flavio Pompermaier
Thank you Jan for your valuable feedback.
Could it be that I should not use import shaded-jackson classes in my user
code?
For example import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper?

Bets,
Flavio

On Mon, Nov 16, 2020 at 3:15 PM Jan Lukavský  wrote:

> Hi Flavio,
>
> when I encountered quite similar problem that you describe, it was related
> to a static storage located in class that was loaded "parent-first". In my
> case it was it was in java.lang.ClassValue, but it might (and probably will
> be) different in your case. The problem is that if user-code registers
> something in some (static) storage located in class loaded with parent
> (TaskTracker) classloader, then its associated classes will never be GC'd
> and Metaspace will grow. A good starting point would be not to focus on
> biggest consumers of heap (in general), but to look at where the 15k
> objects of type Class are referenced from. That might help you figure this
> out. I'm not sure if there is something that can be done in general to
> prevent this type of leaks. That would be probably question on dev@
> mailing list.
>
> Best,
>
>  Jan
> On 11/16/20 2:27 PM, Flavio Pompermaier wrote:
>
> Hello everybody,
> I was writing this email when a similar thread on this mailing list
> appeared..
> The difference is that the other problem seems to be related with Flink
> 1.10 on YARN and does not output anything helpful in debugging the cause of
> the problem.
>
> Indeed, in my use case I use Flink 1.11.0 and Flink on a standalone
> session cluster (the job is submitted to the cluster using the CLI client).
> The problem arises when I submit the same job for about 20 times (this
> number unfortunately is not deterministic and can change a little bit). The
> error reported by the Task Executor is related to the ever growing
> Metaspace..the error seems to be pretty detailed [1].
>
> I found the same issue in some previous threads on this mailing list and
> I've tried to figure it out the cause of the problem. The issue is that
> looking at the objects allocated I don't really get an idea of the source
> of the problem because the type of objects that are consuming the memory
> are of general purpose (i.e. Bytes, Integers and Strings)...these are my
> "top" memory consumers if looking at the output of  jmap -histo :
>
> At run 0:
>
>  num #instances #bytes  class name (module)
> ---
>1: 46238   13224056  [B (java.base@11.0.9.1)
>2:  37366536672  [I (java.base@11.0.9.1)
>3: 38081 913944  java.lang.String (java.base@11.0.9.1)
>4:26 852384  [Lakka.dispatch.forkjoin.ForkJoinTask;
>5:  7146 844984  java.lang.Class (java.base@11.0.9.1)
>
> At run 1:
>
>1: 77.608   25.317.496  [B (java.base@11.0.9.1)
>2:  7.0049.088.360  [I (java.base@11.0.9.1)
>3: 15.8141.887.256  java.lang.Class (java.base@11.0.9.1
> )
>4: 67.3811.617.144  java.lang.String (
> java.base@11.0.9.1)
>5:  3.9061.422.960  [Ljava.util.HashMap$Node; (
> java.base@11.0.9.1)
>
> At run 6:
>
>1: 81.408   25.375.400  [B (java.base@11.0.9.1)
>2: 12.4797.249.392  [I (java.base@11.0.9.1)
>3: 29.0903.496.168  java.lang.Class (java.base@11.0.9.1
> )
>4:  4.3472.813.416  [Ljava.util.HashMap$Node; (
> java.base@11.0.9.1)
>5: 71.5841.718.016  java.lang.String (
> java.base@11.0.9.1)
>
> At run 8:
>
>1:985.979  127.193.256  [B (java.base@11.0.9.1)
>2: 35.400   13.702.112  [I (java.base@11.0.9.1)
>3:260.3876.249.288  java.lang.String (
> java.base@11.0.9.1)
>4:148.8365.953.440  java.util.HashMap$KeyIterator (
> java.base@11.0.9.1)
>5: 17.6415.222.344  [Ljava.util.HashMap$Node; (
> java.base@11.0.9.1)
>
> Thanks in advance for any help,
> Flavio
>
> [1]
> --
> java.lang.OutOfMemoryError: Metaspace. The metaspace out-of-memory error
> has occurred. This can mean two things: either the job requires a larger
> size of JVM metaspace to load classes or there is a class loading leak. In
> the first case 'taskmanager.memory.jvm-metaspace.size' configuration option
> should be increased. If the error persists (usually in cluster after
> several job (re-)submissions) then there is probably a class loading leak
> in user code or some of its dependencies which has to be investigated and
> fixed. The task executor has to be shutdown...
> at java.lang.ClassLoader.defineClass1(Native Method) ~[?:?]
> at java.lang.ClassLoader.defineClass(ClassLoader.java:1017) ~[?:?]
> at
> java.security.SecureClassLoader.defineClass(SecureClassLoad

Re: Random Task executor shutdown (java.lang.OutOfMemoryError: Metaspace)

2020-11-16 Thread Jan Lukavský

Hi Flavio,

when I encountered quite similar problem that you describe, it was 
related to a static storage located in class that was loaded 
"parent-first". In my case it was it was in java.lang.ClassValue, but it 
might (and probably will be) different in your case. The problem is that 
if user-code registers something in some (static) storage located in 
class loaded with parent (TaskTracker) classloader, then its associated 
classes will never be GC'd and Metaspace will grow. A good starting 
point would be not to focus on biggest consumers of heap (in general), 
but to look at where the 15k objects of type Class are referenced from. 
That might help you figure this out. I'm not sure if there is something 
that can be done in general to prevent this type of leaks. That would be 
probably question on dev@ mailing list.


Best,

 Jan

On 11/16/20 2:27 PM, Flavio Pompermaier wrote:

Hello everybody,
I was writing this email when a similar thread on this mailing list 
appeared..
The difference is that the other problem seems to be related 
with Flink 1.10 on YARN and does not output anything helpful in 
debugging the cause of the problem.


Indeed, in my use case I use Flink 1.11.0 and Flink on a standalone 
session cluster (the job is submitted to the cluster using the CLI 
client).
The problem arises when I submit the same job for about 20 times (this 
number unfortunately is not deterministic and can change a little 
bit). The error reported by the Task Executor is related to the ever 
growing Metaspace..the error seems to be pretty detailed [1].


I found the same issue in some previous threads on this mailing list 
and I've tried to figure it out the cause of the problem. The issue is 
that looking at the objects allocated I don't really get an idea of 
the source of the problem because the type of objects that are 
consuming the memory are of general purpose (i.e. Bytes, Integers and 
Strings)...these are my "top" memory consumers if looking at the 
output of  jmap -histo :


At run 0:

 num     #instances         #bytes  class name (module)
---
   1:         46238       13224056  [B (java.base@11.0.9.1 
)
   2:          3736        6536672  [I (java.base@11.0.9.1 
)
   3:         38081         913944  java.lang.String 
(java.base@11.0.9.1 )

   4:            26         852384  [Lakka.dispatch.forkjoin.ForkJoinTask;
   5:          7146         844984  java.lang.Class 
(java.base@11.0.9.1 )


At run 1:

   1:         77.608       25.317.496  [B (java.base@11.0.9.1 
)
   2:          7.004        9.088.360  [I (java.base@11.0.9.1 
)
   3:         15.814        1.887.256  java.lang.Class 
(java.base@11.0.9.1 )
   4:         67.381        1.617.144  java.lang.String 
(java.base@11.0.9.1 )
   5:          3.906        1.422.960  [Ljava.util.HashMap$Node; 
(java.base@11.0.9.1 )


At run 6:

   1:         81.408       25.375.400  [B (java.base@11.0.9.1 
)
   2:         12.479        7.249.392  [I (java.base@11.0.9.1 
)
   3:         29.090        3.496.168  java.lang.Class 
(java.base@11.0.9.1 )
   4:          4.347        2.813.416  [Ljava.util.HashMap$Node; 
(java.base@11.0.9.1 )
   5:         71.584        1.718.016  java.lang.String 
(java.base@11.0.9.1 )


At run 8:

   1:        985.979      127.193.256  [B (java.base@11.0.9.1 
)
   2:         35.400       13.702.112  [I (java.base@11.0.9.1 
)
   3:        260.387        6.249.288  java.lang.String 
(java.base@11.0.9.1 )
   4:        148.836        5.953.440  java.util.HashMap$KeyIterator 
(java.base@11.0.9.1 )
   5:         17.641        5.222.344  [Ljava.util.HashMap$Node; 
(java.base@11.0.9.1 )


Thanks in advance for any help,
Flavio

[1] 
--
java.lang.OutOfMemoryError: Metaspace. The metaspace out-of-memory 
error has occurred. This can mean two things: either the job requires 
a larger size of JVM metaspace to load classes or there is a class 
loading leak. In the first case 
'taskmanager.memory.jvm-metaspace.size' configuration option should be 
increased. If the error persists (usually in cluster after several job 
(re-)submissions) then there is probably a class loading leak in user 
code or some of its dependencies which has to be investigated and 
fixed. The task executor has to be shutdown...

        at java.lang.ClassLoader.defineClass1(Native Method) ~[?:?]
        at java.lang.ClassLoader

Random Task executor shutdown (java.lang.OutOfMemoryError: Metaspace)

2020-11-16 Thread Flavio Pompermaier
Hello everybody,
I was writing this email when a similar thread on this mailing list
appeared..
The difference is that the other problem seems to be related with Flink
1.10 on YARN and does not output anything helpful in debugging the cause of
the problem.

Indeed, in my use case I use Flink 1.11.0 and Flink on a standalone session
cluster (the job is submitted to the cluster using the CLI client).
The problem arises when I submit the same job for about 20 times (this
number unfortunately is not deterministic and can change a little bit). The
error reported by the Task Executor is related to the ever growing
Metaspace..the error seems to be pretty detailed [1].

I found the same issue in some previous threads on this mailing list and
I've tried to figure it out the cause of the problem. The issue is that
looking at the objects allocated I don't really get an idea of the source
of the problem because the type of objects that are consuming the memory
are of general purpose (i.e. Bytes, Integers and Strings)...these are my
"top" memory consumers if looking at the output of  jmap -histo :

At run 0:

 num #instances #bytes  class name (module)
---
   1: 46238   13224056  [B (java.base@11.0.9.1)
   2:  37366536672  [I (java.base@11.0.9.1)
   3: 38081 913944  java.lang.String (java.base@11.0.9.1)
   4:26 852384  [Lakka.dispatch.forkjoin.ForkJoinTask;
   5:  7146 844984  java.lang.Class (java.base@11.0.9.1)

At run 1:

   1: 77.608   25.317.496  [B (java.base@11.0.9.1)
   2:  7.0049.088.360  [I (java.base@11.0.9.1)
   3: 15.8141.887.256  java.lang.Class (java.base@11.0.9.1)
   4: 67.3811.617.144  java.lang.String (java.base@11.0.9.1)
   5:  3.9061.422.960  [Ljava.util.HashMap$Node; (
java.base@11.0.9.1)

At run 6:

   1: 81.408   25.375.400  [B (java.base@11.0.9.1)
   2: 12.4797.249.392  [I (java.base@11.0.9.1)
   3: 29.0903.496.168  java.lang.Class (java.base@11.0.9.1)
   4:  4.3472.813.416  [Ljava.util.HashMap$Node; (
java.base@11.0.9.1)
   5: 71.5841.718.016  java.lang.String (java.base@11.0.9.1)

At run 8:

   1:985.979  127.193.256  [B (java.base@11.0.9.1)
   2: 35.400   13.702.112  [I (java.base@11.0.9.1)
   3:260.3876.249.288  java.lang.String (java.base@11.0.9.1)
   4:148.8365.953.440  java.util.HashMap$KeyIterator (
java.base@11.0.9.1)
   5: 17.6415.222.344  [Ljava.util.HashMap$Node; (
java.base@11.0.9.1)

Thanks in advance for any help,
Flavio

[1]
--
java.lang.OutOfMemoryError: Metaspace. The metaspace out-of-memory error
has occurred. This can mean two things: either the job requires a larger
size of JVM metaspace to load classes or there is a class loading leak. In
the first case 'taskmanager.memory.jvm-metaspace.size' configuration option
should be increased. If the error persists (usually in cluster after
several job (re-)submissions) then there is probably a class loading leak
in user code or some of its dependencies which has to be investigated and
fixed. The task executor has to be shutdown...
at java.lang.ClassLoader.defineClass1(Native Method) ~[?:?]
at java.lang.ClassLoader.defineClass(ClassLoader.java:1017) ~[?:?]
at
java.security.SecureClassLoader.defineClass(SecureClassLoader.java:174)
~[?:?]
at java.net.URLClassLoader.defineClass(URLClassLoader.java:550)
~[?:?]
at java.net.URLClassLoader$1.run(URLClassLoader.java:458) ~[?:?]
at java.net.URLClassLoader$1.run(URLClassLoader.java:452) ~[?:?]
at java.security.AccessController.doPrivileged(Native Method) ~[?:?]
at java.net.URLClassLoader.findClass(URLClassLoader.java:451) ~[?:?]
at
org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:71)
~[flink-dist_2.12-1.11.0.jar:1.11.0]
at
org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:48)
[flink-dist_2.12-1.11.0.jar:1.11.0]
at java.lang.ClassLoader.loadClass(ClassLoader.java:522) [?:?]


Re: PyFlink Table API and UDF Limitations

2020-11-16 Thread Dian Fu
Hi Niklas,

> How can I ingest data in a batch table from Kafka or even better 
> Elasticsearch. Kafka is only offering a Streaming source and Elasticsearch 
> isn't offering a source at all.
> The only workaround which comes to my mind is to use the Kafka streaming 
> source and to apply a single very large window to create a bounded table. Do 
> you think that would work?
> Are there other options available? Maybe converting a Stream to a bounded 
> table is somehow possible? Thank you!


I think you are right that Kafka still doesn't support batch and there is no ES 
source for now. Another option is you could load the data into a connector 
which supports batch. Not sure if anybody else has a better idea about this.

> I found one cause of this problem and it was mixing a Scala 2.12 Flink 
> installation with PyFlink, which has some 2.11 jars in its opt folder. I 
> think the JVM just skipped the class definitions, because they weren't 
> compatible. I actually wasn't aware of the fact that PyFlink comes with 
> prebuilt jar dependencies. If PyFlink is only compatible with Scala 2.11 it 
> would make sense to point that out in the documentation. I think I never read 
> that and it might be missing. Unfortunately there is still one Exception 
> showing up at the very end of the job in the taskmanager log. I did the 
> verification you asked for and the class is present in both jar files. The 
> one which comes with Flink in the opt folder and the one of PyFlink. You can 
> find the log attached.
> I think the main question is which jar file has be loaded in in the three 
> envronments (executor, jobmanager, taskmanager). Is it fine to not put the 
> flink-python_2.11-1.12.0.jar into the lib folder in the jobmanager and 
> taskmanager? Will it somehow be transferred by PyFlink to the jobmanager and 
> taskmanager?

PyFlink comes with the built-in jars such as flink-python_2.11-1.12.0.jar, 
flink-dist_2.11-1.12.0.jar, etc and so you don't need to manually add them(also 
shouldn't do that). Could you remove the duplicate jars and try it again?

> No I don't think that there are additional exceptions besides 
> "org.apache.beam.vendor.grpc.v1p26p0.io.grpc.StatusRuntimeException", but 
> maybe take a look in the attached log files. This problem could be related to 
> 2., maybe the root cause is a class loading issue as well. What do you think? 
> You can find attached three log files. One for the executor, the jobmanager 
> and the taskmanager. Maybe you can find something useful.


I found one similar issue at Beam side: 
https://issues.apache.org/jira/browse/BEAM-6258 
 which has been resolved long 
time ago. I'm still trying to reproduce this issue and will let you know if 
there is any progress. (Would be great if you could help to provide an example 
which could easily reproduce this issue)

> This was very helpful. I was able to implement it. There is only one detail 
> missing. Is it possible to UNNEST an array of Rows or tuples? It would be 
> really great if I would be able to return a list with multiple fields. 
> Currently I'm just putting multiple value into a single VARCHAR, but that 
> means the information needs to be extracted later on. Maybe you have an idea 
> how to avoid that.

Currently, Pandas UDAF still doesn't support complex type and so I'm afraid 
that you have to put multiple values into a single VARCHAR for now.

Regards,
Dian


> 在 2020年11月16日,上午2:46,Niklas Wilcke  写道:
> 
> Hi Dian,
> 
> this was very helpful again. To the old questions I will answer inline as 
> well. Unfortunately also one new question popped up.
> 
> How can I ingest data in a batch table from Kafka or even better 
> Elasticsearch. Kafka is only offering a Streaming source and Elasticsearch 
> isn't offering a source at all.
> The only workaround which comes to my mind is to use the Kafka streaming 
> source and to apply a single very large window to create a bounded table. Do 
> you think that would work?
> Are there other options available? Maybe converting a Stream to a bounded 
> table is somehow possible? Thank you!
> 
> Kind Regards,
> Niklas
> 
> 
> 
>> On 13. Nov 2020, at 16:07, Dian Fu > > wrote:
>> 
>> Hi Niklas,
>> 
>> Good to know that this solution may work for you. Regarding to the questions 
>> you raised, please find my reply inline.
>> 
>> Regards,
>> Dian
>> 
>>> 在 2020年11月13日,下午8:48,Niklas Wilcke >> > 写道:
>>> 
>>> Hi Dian,
>>> 
>>> thanks again for your response. In the meantime I tried out your proposal 
>>> using the UDAF feature of PyFlink 1.12.0-rc1 and it is roughly working, but 
>>> I am facing some issues, which I would like to address. If this goes too 
>>> far, please let me know and I will open a new thread for each of the 
>>> questions. Let me share some more information about my current environment, 
>>> which will maybe help to answer the questions. I'm currently usi

Re: Problems with FlinkKafkaProducer - closing after timeoutMillis = 9223372036854775807 ms.

2020-11-16 Thread Tim Josefsson
To add to this, setting FlinkKafkaProducer.Semantic.AT_LEAST_ONCE instead
of EXACTLY_ONCE makes the problem go away so I imagine there is something
wrong with my setup.
I'm using Kafka 2.2 and I have the following things set on the cluster:

transaction.max.timeout.ms=360
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1


On Mon, 16 Nov 2020 at 14:05, Tim Josefsson 
wrote:

> Hello!
>
> I'm having some problems with my KafkaProducer that I've been unable to
> find a solution to.
>
> I've set up a simple Flink Job that reads from one kafka topic, using
>*kafkaProps.setProperty("isolation.level", "read_committed") *
> since I want to support exactly once data in my application.
>
> After doing some enriching of the data I read from kafka I have the
> following producer set up
>
> FlinkKafkaProducer kafkaSinkProducer = new
> FlinkKafkaProducer<>(
> "enrichedPlayerSessionsTest",
> new
> KafkaStringSerializationSchema("enrichedPlayerSessionsTest"),
> producerProps,
> FlinkKafkaProducer.Semantic.EXACTLY_ONCE
> );
>
> The producer above is then added as a sink at the end of my Flink job.
>
> Now when I run this application I get the following message,
>
> 13:44:40,758 INFO  
> org.apache.kafka.clients.producer.internals.TransactionManager  - [Producer 
> clientId=producer-6, transactionalId=Source: playerSession and 
> playserSessionStarted from Kafka -> Filter out playerSessionStarted -> 
> Extract PlayerSession -> Set MDC for event -> Map -> (Wrap 
> playerSessionEnriched into EntityEventBatch, Sink: Post sessionEnriched to 
> Kafka)-ac09b6abdcafcb1e86b87abd220f2a9d-2] ProducerId set to 21280 with epoch 
> 4
> 13:44:40,759 INFO  org.apache.kafka.clients.producer.KafkaProducer
>- [Producer clientId=producer-6, transactionalId=Source: playerSession 
> and playserSessionStarted from Kafka -> Filter out playerSessionStarted -> 
> Extract PlayerSession -> Set MDC for event -> Map -> (Wrap 
> playerSessionEnriched into EntityEventBatch, Sink: Post sessionEnriched to 
> Kafka)-ac09b6abdcafcb1e86b87abd220f2a9d-2] Closing the Kafka producer with 
> timeoutMillis = 9223372036854775807 ms.
>
> Sometime I also see the following:
>
> 13:44:43,740 INFO  
> org.apache.kafka.clients.producer.internals.TransactionManager  - [Producer 
> clientId=producer-26, transactionalId=Source: playerSession and 
> playserSessionStarted from Kafka -> Filter out playerSessionStarted -> 
> Extract PlayerSession -> Set MDC for event -> Map -> (Wrap 
> playerSessionEnriched into EntityEventBatch, Sink: Post sessionEnriched to 
> Kafka)-ac09b6abdcafcb1e86b87abd220f2a9d-0] ProducerId set to -1 with epoch -1
> 13:44:44,136 INFO  
> org.apache.kafka.clients.producer.internals.TransactionManager  - [Producer 
> clientId=producer-26, transactionalId=Source: playerSession and 
> playserSessionStarted from Kafka -> Filter out playerSessionStarted -> 
> Extract PlayerSession -> Set MDC for event -> Map -> (Wrap 
> playerSessionEnriched into EntityEventBatch, Sink: Post sessionEnriched to 
> Kafka)-ac09b6abdcafcb1e86b87abd220f2a9d-0] ProducerId set to 21297 with epoch 
> 11
> 13:44:44,147 INFO  
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase  - 
> Consumer subtask 0 has no restore state.
>
> Now since this isn't an error the job doesn't crash while running and data 
> does get written to Kafka even with this message. However it does seem wrong 
> to me and I'm wondering if anyone has any insight into why this is happening?
>
> I'm attaching a GIST with the complete log from the application, I ran the 
> job with *env.setParallelism(1)* but I still get around 26 producers created 
> which still seems odd to me. Running without any parallelism set creates 
> about 300-400 producers (based of the clientIds reported)
>
> Thankful for any insight into this!
>
> Best regards,
>
> Tim
>
>

-- 

*Tim Josefsson*
[image: Webstep GPtW] 
mobil   +46 (0) 707 81 91 12
telefon +46 (0) 8 21 40 70

tim.josefs...@webstep.se
*webstep.se *
Suttungs gränd 2
753 19 Uppsala
Stockholm | Uppsala | Malmö | Sundsvall | Oslo
Bergen | Stavanger | Trondheim | Kristiansand
[image: LinkedIn]  [image:
Facebook]  [image: Facebook]



Random Task executor shutdown

2020-11-16 Thread LINZ, Arnaud
(reposted with proper subject line -- sorry for the copy/paste)
-Original message-
Hello,

I'm running Flink 1.10 on a yarn cluster. I have a streaming application, that, 
when under heavy load, fails from time to time with this unique error message 
in the whole yarn log:

(...)
2020-11-15 16:18:42,202 WARN  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Received late 
message for now expired checkpoint attempt 63 from task 
4cbc940112a596db54568b24f9209aac of job 1e1717d19bd8ea296314077e42e1c7e5 at 
container_e38_1604477334666_0960_01_04 @ xxx (dataPort=33099).
2020-11-15 16:18:55,043 INFO  org.apache.flink.yarn.YarnResourceManager 
- Closing TaskExecutor connection 
container_e38_1604477334666_0960_01_04 because: The TaskExecutor is 
shutting down.
2020-11-15 16:18:55,087 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph- Map (7/15) 
(c8e92cacddcd4e41f51a2433d07d2153) switched from RUNNING to FAILED.
org.apache.flink.util.FlinkException: The TaskExecutor is shutting down.

  at 
org.apache.flink.runtime.taskexecutor.TaskExecutor.onStop(TaskExecutor.java:359)
at 
org.apache.flink.runtime.rpc.RpcEndpoint.internalCallOnStop(RpcEndpoint.java:218)
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor$StartedState.terminate(AkkaRpcActor.java:509)
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleControlMessage(AkkaRpcActor.java:175)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
at akka.actor.ActorCell.invoke(ActorCell.scala:561)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
at akka.dispatch.Mailbox.run(Mailbox.scala:225)
at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at 
akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at 
akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
2020-11-15 16:18:55,092 INFO  
org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionStrategy
  - Calculating tasks to restart to recover the failed task 
2f6467d98899e64a4721f0a7b6a059a8_6.
2020-11-15 16:18:55,101 INFO  
org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionStrategy
  - 230 tasks should be restarted to recover the failed task 
2f6467d98899e64a4721f0a7b6a059a8_6.
(...)

What could be the cause of this failure? Why is there no other error message?

I've tried to increase the value of heartbeat.timeout, thinking that maybe it 
was due to a slow responding mapper, but it did not solve the issue.

Best regards,
Arnaud



L'intégrité de ce message n'étant pas assurée sur internet, la société 
expéditrice ne peut être tenue responsable de son contenu ni de ses pièces 
jointes. Toute utilisation ou diffusion non autorisée est interdite. Si vous 
n'êtes pas destinataire de ce message, merci de le détruire et d'avertir 
l'expéditeur.

The integrity of this message cannot be guaranteed on the Internet. The company 
that sent this message cannot therefore be held liable for its content nor 
attachments. Any unauthorized use or dissemination is prohibited. If you are 
not the intended recipient of this message, then please delete it and notify 
the sender.


split avro kafka field

2020-11-16 Thread Youzha
hi, i’am a new comer to learn flink stream. and i wanna try to split the
kafka avro field into multiple fields.

for example i have this one in my kafka topic :

{
   id : 12345,
   name : “john”,
   location : “indonesia;jakarta;south jakarta”
}

and then i wanna split the location value by “;” into this one :


{
   id : 12345,
   name : “john”,
   country : “indonesia”,
   city : “jakarta”,
   locality : “south jakarta”
}

how can i do this with flink stream?

need advice.


RE: Re: Flink 1.11 not showing logs

2020-11-16 Thread LINZ, Arnaud
Hello,

I'm running Flink 1.10 on a yarn cluster. I have a streaming application, that, 
when under heavy load, fails from time to time with this unique error message 
in the whole yarn log:

(...)
2020-11-15 16:18:42,202 WARN  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Received late 
message for now expired checkpoint attempt 63 from task 
4cbc940112a596db54568b24f9209aac of job 1e1717d19bd8ea296314077e42e1c7e5 at 
container_e38_1604477334666_0960_01_04 @ xxx (dataPort=33099).
2020-11-15 16:18:55,043 INFO  org.apache.flink.yarn.YarnResourceManager 
- Closing TaskExecutor connection 
container_e38_1604477334666_0960_01_04 because: The TaskExecutor is 
shutting down.
2020-11-15 16:18:55,087 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph- Map (7/15) 
(c8e92cacddcd4e41f51a2433d07d2153) switched from RUNNING to FAILED.
org.apache.flink.util.FlinkException: The TaskExecutor is shutting down.

  at 
org.apache.flink.runtime.taskexecutor.TaskExecutor.onStop(TaskExecutor.java:359)
at 
org.apache.flink.runtime.rpc.RpcEndpoint.internalCallOnStop(RpcEndpoint.java:218)
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor$StartedState.terminate(AkkaRpcActor.java:509)
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleControlMessage(AkkaRpcActor.java:175)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
at akka.actor.ActorCell.invoke(ActorCell.scala:561)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
at akka.dispatch.Mailbox.run(Mailbox.scala:225)
at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at 
akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at 
akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
2020-11-15 16:18:55,092 INFO  
org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionStrategy
  - Calculating tasks to restart to recover the failed task 
2f6467d98899e64a4721f0a7b6a059a8_6.
2020-11-15 16:18:55,101 INFO  
org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionStrategy
  - 230 tasks should be restarted to recover the failed task 
2f6467d98899e64a4721f0a7b6a059a8_6.
(...)

What could be the cause of this failure? Why is there no other error message?

I've tried to increase the value of heartbeat.timeout, thinking that maybe it 
was due to a slow responding mapper, but it did not solve the issue.

Best regards,
Arnaud



L'intégrité de ce message n'étant pas assurée sur internet, la société 
expéditrice ne peut être tenue responsable de son contenu ni de ses pièces 
jointes. Toute utilisation ou diffusion non autorisée est interdite. Si vous 
n'êtes pas destinataire de ce message, merci de le détruire et d'avertir 
l'expéditeur.

The integrity of this message cannot be guaranteed on the Internet. The company 
that sent this message cannot therefore be held liable for its content nor 
attachments. Any unauthorized use or dissemination is prohibited. If you are 
not the intended recipient of this message, then please delete it and notify 
the sender.


Re: Job crash in job cluster mode

2020-11-16 Thread Tim Eckhardt
Hi Robert, hi Matthias,

 

the job is doing some stateful stream processing (reading data from Kafka) and 
it should run endlessly, so ideally no restarts from time to time.

The TaskManager is the one who is crashing in the end with this kind of 
exception:


    org.apache.kafka.common.errors.DisconnectException: null
    INFO  org.apache.kafka.clients.FetchSessionHandler [] - 
[Consumer clientId=consumer-flink-consumer-8, groupId=flink-consumer] Error 
sending fetch request (sessionId=338952654, epoch=1) to node 3: {}.

    Exception: java.lang.OutOfMemoryError thrown from the 
UncaughtExceptionHandler in thread "kafka-producer-network-thread | producer-2"

 

Thanks for the tip to look into the heap dump, I might be doing this when 
running the next experiment.

 

Best regards, Tim

 

From: Robert Metzger 
Date: Thursday, 12. November 2020 at 09:34
To: "matth...@ververica.com" 
Cc: Tim Eckhardt , "user@flink.apache.org" 

Subject: Re: Job crash in job cluster mode

 

Hey Tim,

 

what Is your Flink job doing? Is it restarting from time to time?
Is the JobManager crashing, or the TaskManager?

 

On Tue, Nov 10, 2020 at 6:01 PM Matthias Pohl  wrote:

Hi Tim,

I'm not aware of any memory-related issues being related to the deployment mode 
used. Have you checked the logs for hints? Additionally, you could try to 
extract a heap dump. That might help you in analyzing the cause of the memory 
consumption.

 

The TaskManager and JobManager are logging the effective memory-related 
configuration during startup. You can look out for the "Preconfiguration" 
section in each of the log files to get a drill-down of how much memory is used 
per memory pool.

 

Best,
Matthias

On Tue, Nov 10, 2020 at 3:37 PM Tim Eckhardt  wrote:

Hi there,

 

I have a problem with running a flink job in job cluster mode using flink 
1.11.1 (also tried 1.11.2).

The same job is running well using the session cluster mode as well as using 
flink 1.10.0 in job cluster mode.

 

The job starts running and is running for quite some time but it runs a lot 
slower than in session cluster mode and crashes after running for about an 
hour. I can observe in the flink dashboard that the JVM heap is constant at a 
high level and is getting slowly closer to the limit (4.13GB in my case) which 
it reaches close to the job crashing. 

There is also some G1_Old_Generation garbage collection going on which I cannot 
observe in session mode as well.

 

GC values after running for about 45min:

 

(Collector, Count, Time)

G1_Young_Generation   1,250  107,937

G1_Old_Generation  322  2,432,362

 

Compared to the GC values of the same job in session cluster mode (after the 
same runtime):

 

G1_Young_Generation   1,920  20,575

G1_Old_Generation  0  0

 

So my vague guess is that it has to be something memory related maybe 
configuration wise.

 

To simplify the setup only one jobmanager and one taskmanager is used. The 
taskmanager has a memory setting of: taskmanager.memory.process.size: 1m 
which should be totally fine for the server. The jobmanager has a defined 
heap_size of 1600m. 

 

Maybe somebody has experienced something like this before?

 

Also is there a way to export the currently loaded configuration parameters of 
the job- and taskmanagers in a cluster? For example I can’t see the current 
memory process size of the taskmanager in the flink dashboard. Because this way 
I could compare the running and crashing setups more easily (using docker and 
environment variables for configuration at the moment which makes it a bit 
harder to debug).

 

Thanks.



smime.p7s
Description: S/MIME cryptographic signature


Re:Re:Re:Re: Re: Flink 1.11 not showing logs

2020-11-16 Thread 马阳阳
Hi Yang,
After I copied the logic from `YarnLogConfigUtil` to my own deployer (maybe 
call its logic instead of copying is a better option), the logs now can show 
normally.


Thanks again for the kind help.







At 2020-11-16 17:28:47, "马阳阳"  wrote:

Hi Yang,
I checked the `YarnLogConfigUtil`, it does some work to set the configuration 
for log.
Should I copy the logic to my deployer?










At 2020-11-16 17:21:07, "马阳阳"  wrote:

Hi Yang,
Thank you for you reply.


I set the value for "$internal.deployment.config-dir" to the Flink 
configuration directory.
And the configuration showed on Flink web UI. But it still not work. So I 
wonder what should 
I set as the value for "$internal.deployment.config-dir"?







At 2020-11-16 16:43:11, "Yang Wang"  wrote:

If you are using your own deployer(aka a java program calls the Flink client 
API to submit Flink jobs),
you need to check the jobmanager configuration in webUI whether 
"$internal.yarn.log-config-file" 
is correctly set. If not, maybe you need to set 
"$internal.deployment.config-dir" in your deployer,
not simply set the FLINK_CONF_DIR environment. Because your deployer needs to 
do some configuration setting
which CliFrontend has done. Please have a try and share more feedback.




Best,
Yang


马阳阳  于2020年11月16日周一 下午2:47写道:

Hi Yang,
We run a self-compiled Flink-1.12-SNAPSHOT, and could not see any 
taskmanager/jobmanager logs.
I have checked the log4j.properties file, and it's in the right format. And the 
FLINK_CONF_DIR is set.
When checking the java dynamic options of task manager, I found that the log 
related options are not
set.
This is the output when ussuing "ps -ef | grep ".


yarn 31049 30974  9 13:57 ?00:03:31 
/usr/lib/jvm/jdk1.8.0_121/bin/java -Xmx536870902 -Xms536870902 
-XX:MaxDirectMemorySize=268435458 -XX:MaxMetaspaceSize=268435456 
org.apache.flink.yarn.YarnTaskExecutorRunner -D 
taskmanager.memory.framework.off-heap.size=134217728b -D 
taskmanager.memory.network.max=134217730b -D 
taskmanager.memory.network.min=134217730b -D 
taskmanager.memory.framework.heap.size=134217728b -D 
taskmanager.memory.managed.size=536870920b -D taskmanager.cpu.cores=1.0 -D 
taskmanager.memory.task.heap.size=402653174b -D 
taskmanager.memory.task.off-heap.size=0b --configDir . 
-Djobmanager.rpc.address=dhpdn09-113 
-Dtaskmanager.resource-id=container_1604585185669_635512_01_000713 -Dweb.port=0 
-Dweb.tmpdir=/tmp/flink-web-1d373ec2-0cbe-49b8-9592-3ac1d207ad63 
-Djobmanager.rpc.port=40093 -Drest.address=dhpdn09-113


My question is, what maybe the problem for this? And any suggestions?


By the way, we submit the program from Java program instead of from the command 
line.


Thanks.


ps: I sent the mail to spark user mail list un-attentionally. So I resent it to 
the Flink user mail list. Sorry for the inconvenience to  @Yang Wang 
















At 2020-11-03 20:56:19, "Yang Wang"  wrote:

You could issue "ps -ef | grep container_id_for_some_tm". And then you will 
find the
following java options about log4j.


-Dlog.file=/var/log/hadoop-yarn/containers/application_xx/container_xx/taskmanager.log
-Dlog4j.configuration=file:./log4j.properties
-Dlog4j.configurationFile=file:./log4j.properties



Best,
Yang


Diwakar Jha  于2020年11月2日周一 下午11:37写道:

Sure. I will check that and get back to you. could you please share how to 
check java dynamic options?


Best,
Diwakar


On Mon, Nov 2, 2020 at 1:33 AM Yang Wang  wrote:

If you have already updated the log4j.properties, and it still could not work, 
then I
suggest to log in the Yarn NodeManager machine and check the log4j.properties
in the container workdir is correct. Also you could have a look at the java 
dynamic
options are correctly set.


I think it should work if the log4j.properties and java dynamic options are set 
correctly.


BTW, could you share the new yarn logs?


Best,
Yang


Diwakar Jha  于2020年11月2日周一 下午4:32写道:





Hi Yang,


Thank you so much for taking a look at the log files. I changed my 
log4j.properties. Below is the actual file that I got from EMR 6.1.0 
distribution of flink 1.11. I observed that it is different from Flink 1.11 
that i downloaded so i changed it. Still I didn't see any logs.


Actual
log4j.rootLogger=INFO,file

# Log all infos in the given file
log4j.appender.file=org.apache.log4j.FileAppender
log4j.appender.file.file=${log.file}
log4j.appender.file.append=false
log4j.appender.file.layout=org.apache.log4j.PatternLayout
log4j.appender.file.layout.ConversionPattern=%d{-MM-dd HH:mm:ss,SSS} %-5p 
%-60c %x - %m%n

# suppress the irrelevant (wrong) warnings from the netty channel handler
log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR,file





modified : commented the above and added new logging from actual flink 
application log4.properties file


#log4j.rootLogger=INFO,file

# Log all infos in the given file
#log4j.appender.file=org.apache.log4j.FileAppender
#log4j.appender.file.file=${log.file}
#log4j.appender.file.append=false
#log

Re:Re:Re: Re: Flink 1.11 not showing logs

2020-11-16 Thread 马阳阳
Hi Yang,
I checked the `YarnLogConfigUtil`, it does some work to set the configuration 
for log.
Should I copy the logic to my deployer?










At 2020-11-16 17:21:07, "马阳阳"  wrote:

Hi Yang,
Thank you for you reply.


I set the value for "$internal.deployment.config-dir" to the Flink 
configuration directory.
And the configuration showed on Flink web UI. But it still not work. So I 
wonder what should 
I set as the value for "$internal.deployment.config-dir"?







At 2020-11-16 16:43:11, "Yang Wang"  wrote:

If you are using your own deployer(aka a java program calls the Flink client 
API to submit Flink jobs),
you need to check the jobmanager configuration in webUI whether 
"$internal.yarn.log-config-file" 
is correctly set. If not, maybe you need to set 
"$internal.deployment.config-dir" in your deployer,
not simply set the FLINK_CONF_DIR environment. Because your deployer needs to 
do some configuration setting
which CliFrontend has done. Please have a try and share more feedback.




Best,
Yang


马阳阳  于2020年11月16日周一 下午2:47写道:

Hi Yang,
We run a self-compiled Flink-1.12-SNAPSHOT, and could not see any 
taskmanager/jobmanager logs.
I have checked the log4j.properties file, and it's in the right format. And the 
FLINK_CONF_DIR is set.
When checking the java dynamic options of task manager, I found that the log 
related options are not
set.
This is the output when ussuing "ps -ef | grep ".


yarn 31049 30974  9 13:57 ?00:03:31 
/usr/lib/jvm/jdk1.8.0_121/bin/java -Xmx536870902 -Xms536870902 
-XX:MaxDirectMemorySize=268435458 -XX:MaxMetaspaceSize=268435456 
org.apache.flink.yarn.YarnTaskExecutorRunner -D 
taskmanager.memory.framework.off-heap.size=134217728b -D 
taskmanager.memory.network.max=134217730b -D 
taskmanager.memory.network.min=134217730b -D 
taskmanager.memory.framework.heap.size=134217728b -D 
taskmanager.memory.managed.size=536870920b -D taskmanager.cpu.cores=1.0 -D 
taskmanager.memory.task.heap.size=402653174b -D 
taskmanager.memory.task.off-heap.size=0b --configDir . 
-Djobmanager.rpc.address=dhpdn09-113 
-Dtaskmanager.resource-id=container_1604585185669_635512_01_000713 -Dweb.port=0 
-Dweb.tmpdir=/tmp/flink-web-1d373ec2-0cbe-49b8-9592-3ac1d207ad63 
-Djobmanager.rpc.port=40093 -Drest.address=dhpdn09-113


My question is, what maybe the problem for this? And any suggestions?


By the way, we submit the program from Java program instead of from the command 
line.


Thanks.


ps: I sent the mail to spark user mail list un-attentionally. So I resent it to 
the Flink user mail list. Sorry for the inconvenience to  @Yang Wang 
















At 2020-11-03 20:56:19, "Yang Wang"  wrote:

You could issue "ps -ef | grep container_id_for_some_tm". And then you will 
find the
following java options about log4j.


-Dlog.file=/var/log/hadoop-yarn/containers/application_xx/container_xx/taskmanager.log
-Dlog4j.configuration=file:./log4j.properties
-Dlog4j.configurationFile=file:./log4j.properties



Best,
Yang


Diwakar Jha  于2020年11月2日周一 下午11:37写道:

Sure. I will check that and get back to you. could you please share how to 
check java dynamic options?


Best,
Diwakar


On Mon, Nov 2, 2020 at 1:33 AM Yang Wang  wrote:

If you have already updated the log4j.properties, and it still could not work, 
then I
suggest to log in the Yarn NodeManager machine and check the log4j.properties
in the container workdir is correct. Also you could have a look at the java 
dynamic
options are correctly set.


I think it should work if the log4j.properties and java dynamic options are set 
correctly.


BTW, could you share the new yarn logs?


Best,
Yang


Diwakar Jha  于2020年11月2日周一 下午4:32写道:





Hi Yang,


Thank you so much for taking a look at the log files. I changed my 
log4j.properties. Below is the actual file that I got from EMR 6.1.0 
distribution of flink 1.11. I observed that it is different from Flink 1.11 
that i downloaded so i changed it. Still I didn't see any logs.


Actual
log4j.rootLogger=INFO,file

# Log all infos in the given file
log4j.appender.file=org.apache.log4j.FileAppender
log4j.appender.file.file=${log.file}
log4j.appender.file.append=false
log4j.appender.file.layout=org.apache.log4j.PatternLayout
log4j.appender.file.layout.ConversionPattern=%d{-MM-dd HH:mm:ss,SSS} %-5p 
%-60c %x - %m%n

# suppress the irrelevant (wrong) warnings from the netty channel handler
log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR,file





modified : commented the above and added new logging from actual flink 
application log4.properties file


#log4j.rootLogger=INFO,file

# Log all infos in the given file
#log4j.appender.file=org.apache.log4j.FileAppender
#log4j.appender.file.file=${log.file}
#log4j.appender.file.append=false
#log4j.appender.file.layout=org.apache.log4j.PatternLayout
#log4j.appender.file.layout.ConversionPattern=%d{-MM-dd HH:mm:ss,SSS} %-5p 
%-60c %x - %m%n

# suppress the irrelevant (wrong) warnings from the netty channel handler
#log4j.logger.org.jboss.nett

Re:Re: Re: Flink 1.11 not showing logs

2020-11-16 Thread 马阳阳
Hi Yang,
Thank you for you reply.


I set the value for "$internal.deployment.config-dir" to the Flink 
configuration directory.
And the configuration showed on Flink web UI. But it still not work. So I 
wonder what should 
I set as the value for "$internal.deployment.config-dir"?







At 2020-11-16 16:43:11, "Yang Wang"  wrote:

If you are using your own deployer(aka a java program calls the Flink client 
API to submit Flink jobs),
you need to check the jobmanager configuration in webUI whether 
"$internal.yarn.log-config-file" 
is correctly set. If not, maybe you need to set 
"$internal.deployment.config-dir" in your deployer,
not simply set the FLINK_CONF_DIR environment. Because your deployer needs to 
do some configuration setting
which CliFrontend has done. Please have a try and share more feedback.




Best,
Yang


马阳阳  于2020年11月16日周一 下午2:47写道:

Hi Yang,
We run a self-compiled Flink-1.12-SNAPSHOT, and could not see any 
taskmanager/jobmanager logs.
I have checked the log4j.properties file, and it's in the right format. And the 
FLINK_CONF_DIR is set.
When checking the java dynamic options of task manager, I found that the log 
related options are not
set.
This is the output when ussuing "ps -ef | grep ".


yarn 31049 30974  9 13:57 ?00:03:31 
/usr/lib/jvm/jdk1.8.0_121/bin/java -Xmx536870902 -Xms536870902 
-XX:MaxDirectMemorySize=268435458 -XX:MaxMetaspaceSize=268435456 
org.apache.flink.yarn.YarnTaskExecutorRunner -D 
taskmanager.memory.framework.off-heap.size=134217728b -D 
taskmanager.memory.network.max=134217730b -D 
taskmanager.memory.network.min=134217730b -D 
taskmanager.memory.framework.heap.size=134217728b -D 
taskmanager.memory.managed.size=536870920b -D taskmanager.cpu.cores=1.0 -D 
taskmanager.memory.task.heap.size=402653174b -D 
taskmanager.memory.task.off-heap.size=0b --configDir . 
-Djobmanager.rpc.address=dhpdn09-113 
-Dtaskmanager.resource-id=container_1604585185669_635512_01_000713 -Dweb.port=0 
-Dweb.tmpdir=/tmp/flink-web-1d373ec2-0cbe-49b8-9592-3ac1d207ad63 
-Djobmanager.rpc.port=40093 -Drest.address=dhpdn09-113


My question is, what maybe the problem for this? And any suggestions?


By the way, we submit the program from Java program instead of from the command 
line.


Thanks.


ps: I sent the mail to spark user mail list un-attentionally. So I resent it to 
the Flink user mail list. Sorry for the inconvenience to  @Yang Wang 
















At 2020-11-03 20:56:19, "Yang Wang"  wrote:

You could issue "ps -ef | grep container_id_for_some_tm". And then you will 
find the
following java options about log4j.


-Dlog.file=/var/log/hadoop-yarn/containers/application_xx/container_xx/taskmanager.log
-Dlog4j.configuration=file:./log4j.properties
-Dlog4j.configurationFile=file:./log4j.properties



Best,
Yang


Diwakar Jha  于2020年11月2日周一 下午11:37写道:

Sure. I will check that and get back to you. could you please share how to 
check java dynamic options?


Best,
Diwakar


On Mon, Nov 2, 2020 at 1:33 AM Yang Wang  wrote:

If you have already updated the log4j.properties, and it still could not work, 
then I
suggest to log in the Yarn NodeManager machine and check the log4j.properties
in the container workdir is correct. Also you could have a look at the java 
dynamic
options are correctly set.


I think it should work if the log4j.properties and java dynamic options are set 
correctly.


BTW, could you share the new yarn logs?


Best,
Yang


Diwakar Jha  于2020年11月2日周一 下午4:32写道:





Hi Yang,


Thank you so much for taking a look at the log files. I changed my 
log4j.properties. Below is the actual file that I got from EMR 6.1.0 
distribution of flink 1.11. I observed that it is different from Flink 1.11 
that i downloaded so i changed it. Still I didn't see any logs.


Actual
log4j.rootLogger=INFO,file

# Log all infos in the given file
log4j.appender.file=org.apache.log4j.FileAppender
log4j.appender.file.file=${log.file}
log4j.appender.file.append=false
log4j.appender.file.layout=org.apache.log4j.PatternLayout
log4j.appender.file.layout.ConversionPattern=%d{-MM-dd HH:mm:ss,SSS} %-5p 
%-60c %x - %m%n

# suppress the irrelevant (wrong) warnings from the netty channel handler
log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR,file





modified : commented the above and added new logging from actual flink 
application log4.properties file


#log4j.rootLogger=INFO,file

# Log all infos in the given file
#log4j.appender.file=org.apache.log4j.FileAppender
#log4j.appender.file.file=${log.file}
#log4j.appender.file.append=false
#log4j.appender.file.layout=org.apache.log4j.PatternLayout
#log4j.appender.file.layout.ConversionPattern=%d{-MM-dd HH:mm:ss,SSS} %-5p 
%-60c %x - %m%n

# suppress the irrelevant (wrong) warnings from the netty channel handler
#log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR,file

# This affects logging for both user code and Flink
rootLogger.level = INFO
rootLogger.appenderRef.file.ref = MainAppender

# Uncomment this 

Re: Re: Flink 1.11 not showing logs

2020-11-16 Thread Yang Wang
If you are using your own deployer(aka a java program calls the Flink
client API to submit Flink jobs),
you need to check the jobmanager configuration in webUI whether "
$internal.yarn.log-config-file"
is correctly set. If not, maybe you need to set "
$internal.deployment.config-dir" in your deployer,
not simply set the FLINK_CONF_DIR environment. Because your deployer needs
to do some configuration setting
which CliFrontend has done. Please have a try and share more feedback.


Best,
Yang

马阳阳  于2020年11月16日周一 下午2:47写道:

> Hi Yang,
> We run a self-compiled Flink-1.12-SNAPSHOT, and could not see any
> taskmanager/jobmanager logs.
> I have checked the log4j.properties file, and it's in the right format.
> And the FLINK_CONF_DIR is set.
> When checking the java dynamic options of task manager, I found that the
> log related options are not
> set.
> This is the output when ussuing "ps -ef | grep ".
>
> yarn 31049 30974  9 13:57 ?00:03:31
> /usr/lib/jvm/jdk1.8.0_121/bin/java -Xmx536870902 -Xms536870902
> -XX:MaxDirectMemorySize=268435458 -XX:MaxMetaspaceSize=268435456
> org.apache.flink.yarn.YarnTaskExecutorRunner -D
> taskmanager.memory.framework.off-heap.size=134217728b -D
> taskmanager.memory.network.max=134217730b -D
> taskmanager.memory.network.min=134217730b -D
> taskmanager.memory.framework.heap.size=134217728b -D
> taskmanager.memory.managed.size=536870920b -D taskmanager.cpu.cores=1.0 -D
> taskmanager.memory.task.heap.size=402653174b -D
> taskmanager.memory.task.off-heap.size=0b --configDir .
> -Djobmanager.rpc.address=dhpdn09-113
> -Dtaskmanager.resource-id=container_1604585185669_635512_01_000713
> -Dweb.port=0
> -Dweb.tmpdir=/tmp/flink-web-1d373ec2-0cbe-49b8-9592-3ac1d207ad63
> -Djobmanager.rpc.port=40093 -Drest.address=dhpdn09-113
>
> My question is, what maybe the problem for this? And any suggestions?
>
> By the way, we submit the program from Java program instead of from the
> command line.
>
> Thanks.
>
> ps: I sent the mail to spark user mail list un-attentionally. So I resent
> it to the Flink user mail list. Sorry for the inconvenience to  @Yang Wang
>
>
>
>
>
>
>
> At 2020-11-03 20:56:19, "Yang Wang"  wrote:
>
> You could issue "ps -ef | grep container_id_for_some_tm". And then you
> will find the
> following java options about log4j.
>
>
> -Dlog.file=/var/log/hadoop-yarn/containers/application_xx/container_xx/taskmanager.log
> -Dlog4j.configuration=file:./log4j.properties
> -Dlog4j.configurationFile=file:./log4j.properties
>
> Best,
> Yang
>
> Diwakar Jha  于2020年11月2日周一 下午11:37写道:
>
>> Sure. I will check that and get back to you. could you please share how
>> to check java dynamic options?
>>
>> Best,
>> Diwakar
>>
>> On Mon, Nov 2, 2020 at 1:33 AM Yang Wang  wrote:
>>
>>> If you have already updated the log4j.properties, and it still could not
>>> work, then I
>>> suggest to log in the Yarn NodeManager machine and check the
>>> log4j.properties
>>> in the container workdir is correct. Also you could have a look at the
>>> java dynamic
>>> options are correctly set.
>>>
>>> I think it should work if the log4j.properties and java dynamic options
>>> are set correctly.
>>>
>>> BTW, could you share the new yarn logs?
>>>
>>> Best,
>>> Yang
>>>
>>> Diwakar Jha  于2020年11月2日周一 下午4:32写道:
>>>


 Hi Yang,

 Thank you so much for taking a look at the log files. I changed my
 log4j.properties. Below is the actual file that I got from EMR 6.1.0
 distribution of flink 1.11. I observed that it is different from Flink 1.11
 that i downloaded so i changed it. Still I didn't see any logs.

 *Actual*
 log4j.rootLogger=INFO,file

 # Log all infos in the given file
 log4j.appender.file=org.apache.log4j.FileAppender
 log4j.appender.file.file=${log.file}
 log4j.appender.file.append=false
 log4j.appender.file.layout=org.apache.log4j.PatternLayout
 log4j.appender.file.layout.ConversionPattern=%d{-MM-dd
 HH:mm:ss,SSS} %-5p %-60c %x - %m%n

 # suppress the irrelevant (wrong) warnings from the netty channel
 handler
 log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR,file


 *modified : *commented the above and added new logging from
 actual flink application log4.properties file

 #log4j.rootLogger=INFO,file

 # Log all infos in the given file
 #log4j.appender.file=org.apache.log4j.FileAppender
 #log4j.appender.file.file=${log.file}
 #log4j.appender.file.append=false
 #log4j.appender.file.layout=org.apache.log4j.PatternLayout
 #log4j.appender.file.layout.ConversionPattern=%d{-MM-dd
 HH:mm:ss,SSS} %-5p %-60c %x - %m%n

 # suppress the irrelevant (wrong) warnings from the netty channel
 handler
 #log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR,file

 # This affects logging for both user code and Flink
 rootLogger.level = INFO
 rootLogger.appenderRef.file.ref = MainAppender

 # Unc