Re: Does Flink support TFRecordFileOutputFormat?

2020-07-12 Thread Peidian Li
Thanks, I'll check it out.

Jingsong Li  于2020年7月13日周一 下午2:50写道:

> Hi,
>
> Flink also has `HadoopOutputFormat`, it can wrap hadoop OutputFormat to
> Flink sink.
> You can have a try.
>
> Best,
> Jingsong
>
> On Mon, Jul 13, 2020 at 2:34 PM 殿李  wrote:
>
>> Hi,
>>
>> Yes, TF means TensorFlow.
>>
>> This class may not be in the spark package, but spark supports writing
>> this file format to HDFS.
>>
>> tfRDD.saveAsNewAPIHadoopFile(output, "org.tensorflow.hadoop.io
>> .TFRecordFileOutputFormat",
>>
>> keyClass="org.apache.hadoop.io.BytesWritable",
>>
>> valueClass="org.apache.hadoop.io.NullWritable”)
>>
>>
>>
>> > 2020年7月13日 下午2:21,Danny Chan  写道:
>> >
>> > I didn’t see any class named TFRecordFileOutputFormat in Spark, for TF
>> do you mean TensorFlow ?
>> >
>> > Best,
>> > Danny Chan
>> > 在 2020年7月10日 +0800 PM5:28,殿李 ,写道:
>> >> Hi,
>> >>
>> >> Does Flink support TFRecordFileOutputFormat? I can't find the relevant
>> information in the document.
>> >>
>> >> As far as I know, spark is supportive.
>> >>
>> >>
>> >> Best regards
>> >> Peidian Li
>>
>>
>
> --
> Best, Jingsong Lee
>


-- 
---
Best  Regards
Peidian Li


Re: Does Flink support TFRecordFileOutputFormat?

2020-07-12 Thread Jingsong Li
Hi,

Flink also has `HadoopOutputFormat`, it can wrap hadoop OutputFormat to
Flink sink.
You can have a try.

Best,
Jingsong

On Mon, Jul 13, 2020 at 2:34 PM 殿李  wrote:

> Hi,
>
> Yes, TF means TensorFlow.
>
> This class may not be in the spark package, but spark supports writing
> this file format to HDFS.
>
> tfRDD.saveAsNewAPIHadoopFile(output, "org.tensorflow.hadoop.io
> .TFRecordFileOutputFormat",
>
> keyClass="org.apache.hadoop.io.BytesWritable",
>
> valueClass="org.apache.hadoop.io.NullWritable”)
>
>
>
> > 2020年7月13日 下午2:21,Danny Chan  写道:
> >
> > I didn’t see any class named TFRecordFileOutputFormat in Spark, for TF
> do you mean TensorFlow ?
> >
> > Best,
> > Danny Chan
> > 在 2020年7月10日 +0800 PM5:28,殿李 ,写道:
> >> Hi,
> >>
> >> Does Flink support TFRecordFileOutputFormat? I can't find the relevant
> information in the document.
> >>
> >> As far as I know, spark is supportive.
> >>
> >>
> >> Best regards
> >> Peidian Li
>
>

-- 
Best, Jingsong Lee


Re: Does Flink support TFRecordFileOutputFormat?

2020-07-12 Thread 殿李
Hi,

Yes, TF means TensorFlow.

This class may not be in the spark package, but spark supports writing this 
file format to HDFS.

tfRDD.saveAsNewAPIHadoopFile(output, 
"org.tensorflow.hadoop.io.TFRecordFileOutputFormat",
keyClass="org.apache.hadoop.io.BytesWritable",
valueClass="org.apache.hadoop.io.NullWritable”)



> 2020年7月13日 下午2:21,Danny Chan  写道:
> 
> I didn’t see any class named TFRecordFileOutputFormat in Spark, for TF do you 
> mean TensorFlow ?
> 
> Best,
> Danny Chan
> 在 2020年7月10日 +0800 PM5:28,殿李 ,写道:
>> Hi,
>> 
>> Does Flink support TFRecordFileOutputFormat? I can't find the relevant 
>> information in the document.
>> 
>> As far as I know, spark is supportive.
>> 
>> 
>> Best regards
>> Peidian Li



Re: Table API throws "No FileSystem for scheme: file" when loading local parquet

2020-07-12 Thread Danny Chan
> No FileSystem for scheme: file

It seems that your path does not work correctly, from the patch you gave, the 
directly name 'test.parquet’ seems invalid.

Best,
Danny Chan
在 2020年7月11日 +0800 AM8:07,Danny Chan ,写道:
>
> It seems that your path does not work correctly, from the patch you gave, the 
> directly name 'test.parquet’ seems invalid.


Re: Does Flink support TFRecordFileOutputFormat?

2020-07-12 Thread Danny Chan
I didn’t see any class named TFRecordFileOutputFormat in Spark, for TF do you 
mean TensorFlow ?

Best,
Danny Chan
在 2020年7月10日 +0800 PM5:28,殿李 ,写道:
> Hi,
>
> Does Flink support TFRecordFileOutputFormat? I can't find the relevant 
> information in the document.
>
> As far as I know, spark is supportive.
>
>
> Best regards
> Peidian Li


Re: Flink 1.11 Table API cannot process Avro

2020-07-12 Thread Lian Jiang
Thanks Leonard and Jark.

Here is my repo for your repro:
https://bitbucket.org/jiangok/flink-playgrounds/src/0d242a51f02083711218d3810267117e6ce4260c/table-walkthrough/pom.xml#lines-131.
As you can see, my pom.xml has already added flink-avro and avro
dependencies.

You can run this repro by:

git clone g...@bitbucket.org:jiangok/flink-playgrounds.git
cd flink-playgrounds/table-walkthrough
. scripts/ops.sh  # this script has some helper commands.
rebuild   # this will build artifacts, docker and run.
log jobmanager  # this will print job manager log which has the exception.

Appreciate very much for your help!



table-walkthrough


On Sun, Jul 12, 2020 at 8:00 PM Leonard Xu  wrote:

> Hi, Jiang
>
> Is there a uber jar or a list of runtime dependencies so that developers can 
> easily make the above example of Flink SQL for avro work? Thanks.
>
>
> The dependency list for using Avro in Flink SQL  is simple and has not a
> uber jar AFAIK, we only need to add `flink-avro` and `avro` dependency, the
> `avro` dependency is mannaged which means you do not need to add it if your
> dependency list has contained a `avro` dependency. I wrote a simple
> demo[1], hope it can help you.
>
> Best,
> Leonard Xu
> [1]
> https://github.com/leonardBang/flink-sql-etl/blob/master/sql-avro/pom.xml#L32
>
>
>
>
>


-- 

Create your own email signature



Re: Flink 1.11 Table API cannot process Avro

2020-07-12 Thread Leonard Xu
Hi, Jiang
> Is there a uber jar or a list of runtime dependencies so that developers can 
> easily make the above example of Flink SQL for avro work? Thanks.

The dependency list for using Avro in Flink SQL  is simple and has not a uber 
jar AFAIK, we only need to add `flink-avro` and `avro` dependency, the `avro` 
dependency is mannaged which means you do not need to add it if your dependency 
list has contained a `avro` dependency. I wrote a simple demo[1], hope it can 
help you.

Best,
Leonard Xu
[1] 
https://github.com/leonardBang/flink-sql-etl/blob/master/sql-avro/pom.xml#L32 




 

Re: Flink 1.11 Table API cannot process Avro

2020-07-12 Thread Jark Wu
>From the latest exception message, it seems that the avro factory problem
has been resolved.
The new exception indicates that you don't have proper Apache Avro
dependencies (because flink-avro doesn't bundle Apache Avro),
so you have to add Apache Avro into your project dependency, or
export HADOOP_CLASSPATH if hadoop is installed in your environment.


org.apache.avro
avro
1.8.2


Best,
Jark

On Mon, 13 Jul 2020 at 03:04, Lian Jiang  wrote:

> Thanks guys.
>
> I missed the runtime dependencies. After adding below into
> https://github.com/apache/flink-playgrounds/blob/master/table-walkthrough/Dockerfile.
> The original issue of "Could not find any factory for identifier" is gone.
>
> wget -P /opt/flink/lib/ 
> https://repo1.maven.org/maven2/org/apache/flink/flink-avro/1.11.0/flink-avro-1.11.0.jar;
>  \
> wget -P /opt/flink/lib/ 
> https://repo1.maven.org/maven2/org/apache/avro/avro/1.8.2/avro-1.8.2.jar; \
> wget -P /opt/flink/lib/ 
> https://repo1.maven.org/maven2/org/codehaus/jackson/jackson-core-asl/1.9.13/jackson-core-asl-1.9.13.jar;
>  \
> wget -P /opt/flink/lib/ 
> https://repo1.maven.org/maven2/org/codehaus/jackson/jackson-mapper-asl/1.9.13/jackson-mapper-asl-1.9.13.jar;
>  \
> wget -P /opt/flink/lib/ 
> https://repo1.maven.org/maven2/com/thoughtworks/paranamer/paranamer/2.3/paranamer-2.3.jar;
>
>
> However, I got various NoSuchMethodException related to 
> JsonNode/JsonNull/GenricRecord/...  The most recent exception is:
>
>  java.lang.RuntimeException: java.lang.NoSuchMethodException: 
> org.apache.avro.generic.GenericRecord.()
> jobmanager_1  | at 
> org.apache.avro.specific.SpecificData.newInstance(SpecificData.java:353) 
> ~[avro-1.8.2.jar:1.8.2]
> jobmanager_1  | at 
> org.apache.avro.specific.SpecificData.newRecord(SpecificData.java:369) 
> ~[avro-1.8.2.jar:1.8.2]
> jobmanager_1  | at 
> org.apache.avro.reflect.ReflectData.newRecord(ReflectData.java:901) 
> ~[avro-1.8.2.jar:1.8.2]
> jobmanager_1  | at 
> org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:212)
>  ~[avro-1.8.2.jar:1.8.2]
> jobmanager_1  | at 
> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175)
>  ~[avro-1.8.2.jar:1.8.2]
> jobmanager_1  | at 
> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153) 
> ~[avro-1.8.2.jar:1.8.2]
> jobmanager_1  | at 
> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:145) 
> ~[avro-1.8.2.jar:1.8.2]
> jobmanager_1  | at 
> org.apache.avro.file.DataFileStream.next(DataFileStream.java:233) 
> ~[avro-1.8.2.jar:1.8.2]
> jobmanager_1  | at 
> org.apache.flink.formats.avro.AvroInputFormat.nextRecord(AvroInputFormat.java:165)
>  ~[flink-avro-1.11.0.jar:1.11.0]
> jobmanager_1  | at 
> org.apache.flink.formats.avro.AvroFileSystemFormatFactory$RowDataAvroInputFormat.nextRecord(AvroFileSystemFormatFactory.java:200)
>  ~[flink-avro-1.11.0.jar:1.11.0]
> jobmanager_1  | at 
> org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction.run(InputFormatSourceFunction.java:91)
>  ~[flink-dist_2.11-1.11.0.jar:1.11.0]
> jobmanager_1  | at 
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
>  ~[flink-dist_2.11-1.11.0.jar:1.11.0]
> jobmanager_1  | at 
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
>  ~[flink-dist_2.11-1.11.0.jar:1.11.0]
> jobmanager_1  | at 
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:201)
>  ~[flink-dist_2.11-1.11.0.jar:1.11.0]
>
> Is there a uber jar or a list of runtime dependencies so that developers can 
> easily make the above example of Flink SQL for avro work? Thanks.
>
>
>
>
>
> On Sat, Jul 11, 2020 at 11:39 PM Leonard Xu  wrote:
>
>> Hi, Jiang
>>
>>
>> jobmanager_1  | Available factory identifiers are:
>> jobmanager_1  |
>> jobmanager_1  | csv
>> jobmanager_1  | json
>> jobmanager_1  | parquet
>>
>>
>> After added the flink-avro dependency, did you restart your
>> cluster/sql-client? It looks flink-avro dependency did not load properly
>> from the log.
>>
>>
>> Best,
>> Leonard Xu
>>
>
>
> --
>
> Create your own email signature
> 
>


Error --GC Cleaner Provider -- Flink 1.11.0

2020-07-12 Thread Murali Krishna Pusala


Hi All,

I have written simple java code that read data using Hive and transform using 
Table API (Blink Planner) and Flink 1.11.0 on HDP cluster. I am encountering 
"java.lang.Error: Failed to find GC Cleaner among available providers” error. 
Full error stack is at end of the email.  

Do anyone encounter the same issue or any solutions/suggestions ?  

Cluster Config:

* Hadoop Version: 2.7.3
* Java Version: 1.8.0_40
* Flink 1.11.0 ( built from source)
* Hive 1.2.1

Thanks
Murali Pusala


Caused by: java.lang.Error: Failed to find GC Cleaner among available 
providers: [Legacy (before Java 9) cleaner provider, New Java 9+ cleaner 
provider]
at 
org.apache.flink.util.JavaGcCleanerWrapper.findGcCleanerManager(JavaGcCleanerWrapper.java:149)
 ~[flink-dist_2.11-1.11.0.jar:1.11.0]
at 
org.apache.flink.util.JavaGcCleanerWrapper.(JavaGcCleanerWrapper.java:56)
 ~[flink-dist_2.11-1.11.0.jar:1.11.0]
at 
org.apache.flink.core.memory.MemoryUtils.createMemoryGcCleaner(MemoryUtils.java:111)
 ~[flink-dist_2.11-1.11.0.jar:1.11.0]
at 
org.apache.flink.core.memory.MemorySegmentFactory.allocateOffHeapUnsafeMemory(MemorySegmentFactory.java:175)
 ~[flink-dist_2.11-1.11.0.jar:1.11.0]
at 
org.apache.flink.runtime.memory.MemoryManager.lambda$allocatePages$0(MemoryManager.java:237)
 ~[flink-dist_2.11-1.11.0.jar:1.11.0]
at 
org.apache.flink.runtime.memory.MemoryManager$$Lambda$188/978816879.apply(Unknown
 Source) ~[?:?]
at 
java.util.concurrent.ConcurrentHashMap.compute(ConcurrentHashMap.java:1853) 
~[?:1.8.0_40]
at 
org.apache.flink.runtime.memory.MemoryManager.allocatePages(MemoryManager.java:233)
 ~[flink-dist_2.11-1.11.0.jar:1.11.0]
at 
org.apache.flink.table.runtime.util.LazyMemorySegmentPool.nextSegment(LazyMemorySegmentPool.java:82)
 ~[flink-table-uber-blink_2.11-1.11.0.jar:1.11.0]
at 
org.apache.flink.runtime.io.disk.SimpleCollectingOutputView.(SimpleCollectingOutputView.java:49)
 ~[flink-dist_2.11-1.11.0.jar:1.11.0]
at 
org.apache.flink.table.runtime.operators.aggregate.BytesHashMap$RecordArea.(BytesHashMap.java:522)
 ~[flink-table-uber-blink_2.11-1.11.0.jar:1.11.0]
at 
org.apache.flink.table.runtime.operators.aggregate.BytesHashMap.(BytesHashMap.java:184)
 ~[flink-table-uber-blink_2.11-1.11.0.jar:1.11.0]
at 
org.apache.flink.table.runtime.operators.aggregate.BytesHashMap.(BytesHashMap.java:148)
 ~[flink-table-uber-blink_2.11-1.11.0.jar:1.11.0]
at LocalHashAggregateWithKeys$100.open(Unknown Source) ~[?:?]
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:291)
 ~[flink-dist_2.11-1.11.0.jar:1.11.0]
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:473)
 ~[flink-dist_2.11-1.11.0.jar:1.11.0]
at 
org.apache.flink.streaming.runtime.tasks.StreamTask$$Lambda$176/1800425807.run(Unknown
 Source) ~[?:?]
at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92)
 ~[flink-dist_2.11-1.11.0.jar:1.11.0]
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:469)
 ~[flink-dist_2.11-1.11.0.jar:1.11.0]
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:522) 
~[flink-dist_2.11-1.11.0.jar:1.11.0]
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721) 
~[flink-dist_2.11-1.11.0.jar:1.11.0]
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) 
~[flink-dist_2.11-1.11.0.jar:1.11.0]
at java.lang.Thread.run(Thread.java:745) ~[?:1.8.0_40]
Caused by: org.apache.flink.util.FlinkRuntimeException: Legacy (before Java 9) 
cleaner provider: Failed to find Reference#tryHandlePending method
at 
org.apache.flink.util.JavaGcCleanerWrapper$ReflectionUtils.findMethod(JavaGcCleanerWrapper.java:398)
 ~[flink-dist_2.11-1.11.0.jar:1.11.0]
at 
org.apache.flink.util.JavaGcCleanerWrapper$ReflectionUtils.access$1300(JavaGcCleanerWrapper.java:376)
 ~[flink-dist_2.11-1.11.0.jar:1.11.0]
at 
org.apache.flink.util.JavaGcCleanerWrapper$PendingCleanersRunnerProvider.createPendingCleanersRunner(JavaGcCleanerWrapper.java:326)
 ~[flink-dist_2.11-1.11.0.jar:1.11.0]
at 
org.apache.flink.util.JavaGcCleanerWrapper$PendingCleanersRunnerProvider.access$800(JavaGcCleanerWrapper.java:303)
 ~[flink-dist_2.11-1.11.0.jar:1.11.0]
at 
org.apache.flink.util.JavaGcCleanerWrapper$CleanerProvider.createCleanerManager(JavaGcCleanerWrapper.java:180)
 ~[flink-dist_2.11-1.11.0.jar:1.11.0]
at 
org.apache.flink.util.JavaGcCleanerWrapper$CleanerProvider.access$400(JavaGcCleanerWrapper.java:162)
 ~[flink-dist_2.11-1.11.0.jar:1.11.0]
at 
org.apache.flink.util.JavaGcCleanerWrapper.findGcCleanerManager(JavaGcCleanerWrapper.java:140)
 ~[flink-dist_2.11-1.11.0.jar:1.

Re: Flink 1.11 Table API cannot process Avro

2020-07-12 Thread Lian Jiang
Thanks guys.

I missed the runtime dependencies. After adding below into
https://github.com/apache/flink-playgrounds/blob/master/table-walkthrough/Dockerfile.
The original issue of "Could not find any factory for identifier" is gone.

wget -P /opt/flink/lib/
https://repo1.maven.org/maven2/org/apache/flink/flink-avro/1.11.0/flink-avro-1.11.0.jar;
\
wget -P /opt/flink/lib/
https://repo1.maven.org/maven2/org/apache/avro/avro/1.8.2/avro-1.8.2.jar;
\
wget -P /opt/flink/lib/
https://repo1.maven.org/maven2/org/codehaus/jackson/jackson-core-asl/1.9.13/jackson-core-asl-1.9.13.jar;
\
wget -P /opt/flink/lib/
https://repo1.maven.org/maven2/org/codehaus/jackson/jackson-mapper-asl/1.9.13/jackson-mapper-asl-1.9.13.jar;
\
wget -P /opt/flink/lib/
https://repo1.maven.org/maven2/com/thoughtworks/paranamer/paranamer/2.3/paranamer-2.3.jar;


However, I got various NoSuchMethodException related to
JsonNode/JsonNull/GenricRecord/...  The most recent exception is:

 java.lang.RuntimeException: java.lang.NoSuchMethodException:
org.apache.avro.generic.GenericRecord.()
jobmanager_1  | at
org.apache.avro.specific.SpecificData.newInstance(SpecificData.java:353)
~[avro-1.8.2.jar:1.8.2]
jobmanager_1  | at
org.apache.avro.specific.SpecificData.newRecord(SpecificData.java:369)
~[avro-1.8.2.jar:1.8.2]
jobmanager_1  | at
org.apache.avro.reflect.ReflectData.newRecord(ReflectData.java:901)
~[avro-1.8.2.jar:1.8.2]
jobmanager_1  | at
org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:212)
~[avro-1.8.2.jar:1.8.2]
jobmanager_1  | at
org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175)
~[avro-1.8.2.jar:1.8.2]
jobmanager_1  | at
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
~[avro-1.8.2.jar:1.8.2]
jobmanager_1  | at
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:145)
~[avro-1.8.2.jar:1.8.2]
jobmanager_1  | at
org.apache.avro.file.DataFileStream.next(DataFileStream.java:233)
~[avro-1.8.2.jar:1.8.2]
jobmanager_1  | at
org.apache.flink.formats.avro.AvroInputFormat.nextRecord(AvroInputFormat.java:165)
~[flink-avro-1.11.0.jar:1.11.0]
jobmanager_1  | at
org.apache.flink.formats.avro.AvroFileSystemFormatFactory$RowDataAvroInputFormat.nextRecord(AvroFileSystemFormatFactory.java:200)
~[flink-avro-1.11.0.jar:1.11.0]
jobmanager_1  | at
org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction.run(InputFormatSourceFunction.java:91)
~[flink-dist_2.11-1.11.0.jar:1.11.0]
jobmanager_1  | at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
~[flink-dist_2.11-1.11.0.jar:1.11.0]
jobmanager_1  | at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
~[flink-dist_2.11-1.11.0.jar:1.11.0]
jobmanager_1  | at
org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:201)
~[flink-dist_2.11-1.11.0.jar:1.11.0]

Is there a uber jar or a list of runtime dependencies so that
developers can easily make the above example of Flink SQL for avro
work? Thanks.





On Sat, Jul 11, 2020 at 11:39 PM Leonard Xu  wrote:

> Hi, Jiang
>
>
> jobmanager_1  | Available factory identifiers are:
> jobmanager_1  |
> jobmanager_1  | csv
> jobmanager_1  | json
> jobmanager_1  | parquet
>
>
> After added the flink-avro dependency, did you restart your
> cluster/sql-client? It looks flink-avro dependency did not load properly
> from the log.
>
>
> Best,
> Leonard Xu
>


-- 

Create your own email signature



Re: History Server Not Showing Any Jobs - File Not Found?

2020-07-12 Thread Chesnay Schepler
Ah, I remembered wrong, my apologies. Unfortunately there is no option 
to prevent the cleanup; it is something I wanted to have for a long time 
though...


On 11/07/2020 17:57, Hailu, Andreas wrote:


Thanks for the clarity. To this point you made:

/(Note that by configuring "historyserver.web.tmpdir" to some 
permanent directory subsequent (re)starts of the HistorySserver can 
re-use this directory; so you only have to download things once)/


The HistoryServer process in fact deletes this local cache during its 
shutdown hook. Is there a setting we can use so that it doesn’t do this?


2020-07-11 11:43:29,527 [HistoryServer shutdown hook] INFO 
HistoryServer - *Removing web dashboard root cache directory 
/local/scratch/flink_historyserver_tmpdir*


2020-07-11 11:43:29,536 [HistoryServer shutdown hook] INFO 
HistoryServer - Stopped history server.


We’re attempting to work around the UI becoming un-responsive/crashing 
the browser at a large number archives (in my testing, that’s around 
20,000 archives with Chrome)  by persisting the job IDs of our 
submitted apps and then navigating to the job overview page directly, 
e.g. http://(host):(port)/#/job/(jobId)/overview 
. It would have been 
really great if the server stored archives by the application ID 
rather than the job ID – particularly for apps that potentially submit 
hundreds of jobs. Tracking one application ID (ala Spark) would ease 
the burden on the dev + ops side. Perhaps a feature for the future J


*// *ah**

*From:*Chesnay Schepler 
*Sent:* Tuesday, June 2, 2020 3:55 AM
*To:* Hailu, Andreas [Engineering] ; 
user@flink.apache.org

*Subject:* Re: History Server Not Showing Any Jobs - File Not Found?

1) It downloads all archives and stores them on disk; the only thing 
stored in memory is the job ID or the archive. There is no hard upper 
limit; it is mostly constrained by disk space / memory. I say mostly, 
because I'm not sure how well the WebUI handles 100k jobs being loaded 
into the overview.


2) No, there is no retention policy. It is currently expected that an 
external process cleans up archives. If an archive was deleted (from 
the archive directory) the HistoryServer does notice that and also 
delete the local copy.


On 01/06/2020 23:05, Hailu, Andreas wrote:

So I created a new HDFS directory with just 1 archive and pointed
the server to monitor that directory, et voila – I’m able to see
the applications in the UI. So it must have been really churning
trying to fetch all of those initial archives J

I have a couple of follow up questions if you please:

1.What is the upper limit of the number of archives the history
server can support? Does it attempt to download every archive and
load them all into memory?

2.Retention: we have on the order of 100K applications per day in
our production environment. Is there any native retention of
policy? E.g. only keep the latest X archives in the dir - or is
this something we need to manage ourselves?

Thanks.

*// *ah

*From:*Hailu, Andreas [Engineering]
*Sent:* Friday, May 29, 2020 8:46 AM
*To:* 'Chesnay Schepler' 
; user@flink.apache.org

*Subject:* RE: History Server Not Showing Any Jobs - File Not Found?

Yes, these are all in the same directory, and we’re at 67G right
now. I’ll try with incrementally smaller directories and let you
know what I find.

*// *ah

*From:*Chesnay Schepler mailto:ches...@apache.org>>
*Sent:* Friday, May 29, 2020 3:11 AM
*To:* Hailu, Andreas [Engineering] mailto:andreas.ha...@ny.email.gs.com>>; user@flink.apache.org

*Subject:* Re: History Server Not Showing Any Jobs - File Not Found?

oh I'm not using the HistoryServer; I just wrote it ;)

Are these archives all in the same location? So we're roughly
looking at 5 GB of archives then?

That could indeed "just" be a resource problem. The HistoryServer
eagerly downloads all archives, and not on-demand.

The next step would be to move some of the archives into a
separate HDFS directory and try again.

(Note that by configuring "historyserver.web.tmpdir" to some
permanent directory subsequent (re)starts of the HistorySserver
can re-use this directory; so you only have to download things once)

On 29/05/2020 00:43, Hailu, Andreas wrote:

May I also ask what version of flink-hadoop you’re using and
the number of jobs you’re storing the history for? As of
writing we have roughly 101,000 application history files. I’m
curious to know if we’re encountering some kind of resource
problem.

*// *ah

*From:*Hailu, Andreas [Engineering]
*Sent:* Thursday, May 28, 2020 12:18 PM
*To:* 'Chesnay Schepler' 
; user@flink.apache.org

   

Re: Customised RESTful trigger

2020-07-12 Thread Chesnay Schepler
You can specify arguments to your job via query parameters or a json 
body (recommended) as documented here 
.


On 10/07/2020 18:48, Jacek Grzebyta wrote:

Hello,

I am a newbie in the Apache Flink environment. I found it is possible 
to trigger a job using the MONITORING REST API. Is it possible to 
customise a request to start a job with some parameters? From the 
bigger perspective I would like to provide a large file URL into a 
Flink application to do a TFL job.


For example after request:

/job?inputFile=s3://my-bucket/input/input-600m.json

Flink will start the FTL on an instance. Independently if the service 
will receive another query:


/job?inputFile=s3://my-bucket/input/input-other2G.json

Flink would start the other processing on the other job instance.

I thought I could deploy the jar file with parameters but that would 
be quite weird.
I have no idea how can I solve that without converting a REST request 
into a stream event first which would be the simplest.


Regards,
Jacek





Re: Table API jobs migration to Flink 1.11

2020-07-12 Thread godfrey he
hi Flavio,

`BatchTableSource` can only be used for old planner.
if you want to use Blink planner to run batch job,
your table source should implement `StreamTableSource`
and `isBounded` method return true.

Best,
Godfrey



Flavio Pompermaier  于2020年7月10日周五 下午10:32写道:

> Is it correct to do something like this?
>
> TableSource myTableSource = new BatchTableSource() {
>   @Override
>   public TableSchema getTableSchema() {
> return new TableSchema(dsFields, ft);
>   }
>   @Override
>   public DataSet getDataSet(ExecutionEnvironment execEnv) {
> return execEnv.createInput(myInputformat);
>   }
> };
>
> On Fri, Jul 10, 2020 at 3:54 PM Flavio Pompermaier 
> wrote:
>
>> How can you reuse InputFormat to write a TableSource? I think that at
>> least initially this could be the simplest way to test the migration..then
>> I could try yo implement the new Table Source interface
>>
>> On Fri, Jul 10, 2020 at 3:38 PM godfrey he  wrote:
>>
>>> hi Flavio,
>>> Only old planner supports BatchTableEnvironment (which can convert
>>> to/from DataSet),
>>> while Blink planner in batch mode only support TableEnvironment. Because
>>> Blink planner
>>> convert the batch queries to Transformation (corresponding to
>>> DataStream), instead of DataSet.
>>>
>>> one approach is you can migrate them to TableSource instead (InputFormat
>>> can be reused),
>>> but TableSource will be deprecated later. you can try new table source[1]
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sourceSinks.html
>>>
>>> Best,
>>> Godfrey
>>>
>>> Flavio Pompermaier  于2020年7月10日周五 下午8:54写道:
>>>
 Thanks but I still can't understand how to migrate my legacy code. The
 main problem is that I can't create a BatchTableEnv anymore so I can't
 call createInput.

 Is there a way to reuse InputFormats? Should I migrate them to
 TableSource instead?

 public static void main(String[] args) throws Exception {
 ExecutionEnvironment env =
 ExecutionEnvironment.getExecutionEnvironment();
 BatchTableEnvironment btEnv =
 TableEnvironment.getTableEnvironment(env);
 MyInputFormat myInputformat =  new MyInputFormat(dsFields,
 ft).finish();
 DataSet rows = env.createInput(myInputformat);
 Table table = btEnv.fromDataSet(rows, String.join(",", dsFields));
 CsvTableSink outSink = new CsvTableSink("file:/tmp/test.tsv", "\t",
 1, WriteMode.OVERWRITE);
 btEnv.registerTableSink("out", dsFields, ft, outSink);
 btEnv.insertInto(table, "out", btEnv.queryConfig());
 env.execute();
   }

 On Fri, Jul 10, 2020 at 11:56 AM Dawid Wysakowicz <
 dwysakow...@apache.org> wrote:

> You should be good with using the TableEnvironment. The
> StreamTableEnvironment is needed only if you want to convert to
> DataStream. We do not support converting batch Table programs to
> DataStream yet.
>
> A following code should work:
>
> EnvironmentSettings settings =
> EnvironmentSettings.newInstance().inBatchMode().build();
>
> TableEnvironment.create(settings);
>
> Best,
>
> Dawid
>
> On 10/07/2020 11:48, Flavio Pompermaier wrote:
> > Hi to all,
> > I was trying to update my legacy code to Flink 1.11. Before I was
> > using a BatchTableEnv and now I've tried to use the following:
> >
> > EnvironmentSettings settings =
> > EnvironmentSettings.newInstance().inBatchMode().build();
> >
> > Unfortunately in the StreamTableEnvironmentImpl code there's :
> >
> > if (!settings.isStreamingMode()) {
> > throw new TableException(
> > "StreamTableEnvironment can not run in batch mode for now, please use
> > TableEnvironment.");
> > }
> >
> > What should I do here?
> >
> > Thanks in advance,
> > Flavio
>
>



Re: Checkpoint failed because of TimeWindow cannot be cast to VoidNamespace

2020-07-12 Thread Si-li Liu
Someone told me that maybe this issue is Mesos specific. I'm kind of a
newbie in Flink, and I digged into the code but can not get a conclusion.
Here I just wanna have a better JoinWindow that emits the result and delete
it from the window state immediately when joined successfully, is there any
other way? Thanks!

Congxian Qiu  于2020年7月11日周六 下午3:20写道:

> Hi Si-li
>
> Thanks for the notice.
> I just want to double-check is the original problem has been solved?  As I
> found that the created issue FLINK-18464 has been closed with reason "can
> not reproduce". Am I missing something here?
>
> Best,
> Congxian
>
>
> Si-li Liu  于2020年7月10日周五 下午6:06写道:
>
>> Sorry
>>
>> I can't reproduce it with reduce/aggregate/fold/apply and due to some
>> limitations in my working environment, I can't use flink 1.10 or 1.11.
>>
>> Congxian Qiu  于2020年7月5日周日 下午6:21写道:
>>
>>> Hi
>>>
>>> First, Could you please try this problem still there if use flink 1.10
>>> or 1.11?
>>>
>>> It seems strange, from the error message, here is an error when trying
>>> to convert a non-Window state(VoidNameSpace) to a Window State (serializer
>>> is the serializer of Window state, but the state is non-Window state).
>>> Could you please try to replace the MyFuction with a 
>>> reduce/aggregate/fold/apply()
>>> function to see what happens? -- this wants to narrow down the problem.
>>>
>>> Best,
>>> Congxian
>>>
>>>
>>> Si-li Liu  于2020年7月3日周五 下午6:44写道:
>>>
 Thanks for your help

 1. I started the job from scratch, not a savepoint or externalized
 checkpoint
 2. No job graph change
 3. env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
 4. My Flink version is 1.9.1

 Khachatryan Roman  于2020年7月3日周五 下午4:49写道:

> I still wasn't able to reproduce the issue.
>
> Can you also clarify:
> - Are you starting the job from a savepoint or externalized
> checkpoint?
> - If yes, was the job graph changed?
> - What StreamTimeCharacteristic is set, if any?
> - What exact version of Flink do you use?
>
> Regards,
> Roman
>
>
> On Fri, Jul 3, 2020 at 6:38 AM Si-li Liu  wrote:
>
>> Hi, Thanks for your help.
>>
>> The checkpoint configuration is
>>
>> checkpoint.intervalMS=30
>> checkpoint.timeoutMS=30
>>
>> The error callstack is from JM's log, which happened in every cp.
>> Currently I don't have a success cp yet.
>>
>> Khachatryan Roman  于2020年7月3日周五
>> 上午3:50写道:
>>
>>> Hi,
>>>
>>> Thanks for the details.
>>> However, I was not able to reproduce the issue. I used parallelism
>>> levels 4, file system backend and tried different timings for
>>> checkpointing, windowing and source.
>>> Do you encounter this problem deterministically, is it always 1st
>>> checkpoint?
>>> What checkpointing interval do you use?
>>>
>>> Regards,
>>> Roman
>>>
>>>
>>> On Thu, Jul 2, 2020 at 1:57 PM Si-li Liu  wrote:
>>>
 Hi, this is our production code so I have to modify it a little
 bit, such as variable name and function name. I think 3 classes I 
 provide
 here is enough.

 I try to join two streams, but I don't want to use the default join
 function, because I want to send the joined log immediately and remove 
 it
 from window state immediately. And my window gap time is very long( 20
 minutes), so it maybe evaluate it multiple times.

 class JoinFunction extends
   ProcessWindowFunction[RawLog, OutputLog, String, TimeWindow]{

   var ueState: ValueState[RawLog] = _
   @transient var gZipThriftSerializer: GZipThriftSerializer[MyType] = _
   val invalidCounter = new LongCounter()
   val processCounter = new LongCounter()
   val sendToKafkaCounter = new LongCounter()

   override def open(parameters: Configuration): Unit = {
 ueState = getRuntimeContext.getState(
   new ValueStateDescriptor[RawLog](bidState, classOf[RawLog])
 )
 gZipThriftSerializer = new GZipThriftSerializer[MyType]()
 getRuntimeContext.addAccumulator("processCounter", 
 this.processCounter)
 getRuntimeContext.addAccumulator("invalidCounter", 
 this.invalidCounter)
 getRuntimeContext.addAccumulator("sendToKafkaCounter", 
 this.sendToKafkaCounter)
   }

   override def process(key: String,
ctx: Context,
logs: Iterable[RawLog],
out: Collector[OutputLog]): Unit = {
 if (ueState.value() != null) {
   processCounter.add(1L)
   val bid = ueState.value()
   val bidLog = 
 gZipThriftSerializer.decompressAndDeserialize(bid.payload, 
 classOf

Re: Savepoint fails due to RocksDB 2GiB limit

2020-07-12 Thread Ori Popowski
> AFAIK, current the 2GB limit is still there. as a workaround, maybe you
can reduce the state size. If this can not be done using the window
operator, can the keyedprocessfunction[1] be ok for you?

I'll see if I can introduce it to the code.

> if you do, the ProcessWindowFunction is getting as argument an Iterable
with ALL elements collected along the session. This will make the state per
key potentially huge (like you're experiencing).

Thanks for noticing that. It's indeed true that we do this. The reason is
the nature of the computation, which cannot be done incrementally
unfortunately. It's not a classic avg(), max(), last() etc. computation
which can be reduced in each step.
I'm thinking of a way to cap the volume of the state per key using an
aggregate function that limits the number of elements and returns a list of
the collected events.

class CappingAggregator(limit: Int) extends AggregateFunction[Event,
Vector[Event], Vector[Event]] {
  override def createAccumulator(): Vector[Event] = Vector.empty

  override def add(value: Event, acc: Vector[Event]): Vector[Event] =
if (acc.size < limit) acc :+ value
else acc

  override def getResult(acc: Vector[Event]): Vector[Event] = Vector(acc:
_*)

  override def merge(a: Vector[Event], b: Vector[Event]): Vector[Event] =
(a ++ b).slice(0, limit)
}

My only problem is with merge(). I'm not sure if b is always later elements
than a's or if I must sort and only then slice.

On Sat, Jul 11, 2020 at 10:16 PM Rafi Aroch  wrote:

> Hi Ori,
>
> In your code, are you using the process() API?
>
> .process(new MyProcessWindowFunction());
>
> if you do, the ProcessWindowFunction is getting as argument an Iterable
> with ALL elements collected along the session. This will make the state per
> key potentially huge (like you're experiencing).
>
> As Aljoscha Krettek suggested in the JIRA, if you can use the aggregate()
> API and store in state only an aggregate that is getting incrementally
> updated on every incoming event (this could be ONE Class / Map / Tuple /
> etc) rather than keeping ALL elements.
>
> See example here:
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#incremental-window-aggregation-with-aggregatefunction
>
> Thanks,
> Rafi
>
>
> On Sat, Jul 11, 2020 at 10:29 AM Congxian Qiu 
> wrote:
>
>> Hi Ori
>>
>> AFAIK, current the 2GB limit is still there. as a workaround, maybe you
>> can reduce the state size. If this can not be done using the window
>> operator, can the keyedprocessfunction[1] be ok for you?
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/stream/operators/process_function.html#the-keyedprocessfunction
>>
>> Best,
>> Congxian
>>
>>
>> Ori Popowski  于2020年7月8日周三 下午8:30写道:
>>
>>> I've asked this question in
>>> https://issues.apache.org/jira/browse/FLINK-9268 but it's been inactive
>>> for two years so I'm not sure it will be visible.
>>>
>>> While creating a savepoint I get a 
>>> org.apache.flink.util.SerializedThrowable:
>>> java.lang.NegativeArraySizeException. It's happening because some of my
>>> windows have a keyed state of more than 2GiB, hitting RocksDB memory limit.
>>>
>>> How can I prevent this?
>>>
>>> As I understand it, I need somehow to limit the accumulated size of the
>>> window I'm using, which is EventTimeWindow. However, I have no way of
>>> doing so, because the WindowOperator manages its state on its own.
>>>
>>> Below is a full stack trace.
>>>
>>> org.apache.flink.util.SerializedThrowable: Could not materialize
>>> checkpoint 139 for operator Window(EventTimeSessionWindows(180),
>>> EventTimeTrigger, ScalaProcessWindowFunctionWrapper) -> Flat Map -> Sink:
>>> Unnamed (23/189).
>>> at
>>> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:1238)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1180)
>>> at
>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>>> at
>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>>> at java.lang.Thread.run(Thread.java:748)
>>> Caused by: org.apache.flink.util.SerializedThrowable:
>>> java.lang.NegativeArraySizeException
>>> at java.util.concurrent.FutureTask.report(FutureTask.java:122)
>>> at java.util.concurrent.FutureTask.get(FutureTask.java:192)
>>> at
>>> org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:461)
>>> at
>>> org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.(OperatorSnapshotFinalizer.java:47)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1143)
>>> ... 3 common frames omitted
>>> Caused by: org.apache.flink.util.SerializedThrowable: null
>>> at org.rocksdb.RocksIterator.value0(Native Method)
>>> at org.rocksdb.RocksIterator.value(RocksIterator.java:50)
>>> at
>>> org.apache.flink.contrib