[ANNOUNCE] Apache Flink 1.11.1 released

2020-07-21 Thread Dian Fu
The Apache Flink community is very happy to announce the release of Apache 
Flink 1.11.1, which is the first bugfix release for the Apache Flink 1.11 
series.

Apache Flink® is an open-source stream processing framework for distributed, 
high-performing, always-available, and accurate data streaming applications.

The release is available for download at:
https://flink.apache.org/downloads.html

Please check out the release blog post for an overview of the improvements for 
this bugfix release:
https://flink.apache.org/news/2020/07/21/release-1.11.1.html

The full release notes are available in Jira:
https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522&version=12348323

We would like to thank all contributors of the Apache Flink community who made 
this release possible!

Regards,
Dian

Re: Handle idle kafka source in Flink 1.9

2020-07-21 Thread bat man
Hi Team,

Can someone share their experiences handling this.

Thanks.

On Tue, Jul 21, 2020 at 11:30 AM bat man  wrote:

> Hello,
>
> I have a pipeline which consumes data from a Kafka source. Since, the
> partitions are partitioned by device_id in case a group of devices is down
> some partitions will not get normal flow of data.
> I understand from documentation here[1] in flink 1.11 one can declare the
> source idle -
> WatermarkStrategy.>forBoundedOutOfOrderness(Duration.
> ofSeconds(20)).withIdleness(Duration.ofMinutes(1));
>
> How can I handle this in 1.9, since I am using aws emr and emr doesn't
> have any release with the latest flink version.
>
> One way I could think of is to trigger watermark generation every 10
> minutes or so using Periodic watermarks. However, this will not be full
> proof, are there any better way to handle this more dynamically.
>
> [1] -
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/event_timestamps_watermarks.html#watermark-strategies-and-the-kafka-connector
>
> Thanks,
> Hemant
>
>


Re: Possible bug clean up savepoint dump into filesystem and low network IO starting from savepoint

2020-07-21 Thread Congxian Qiu
Hi David
thanks for the confirmation, good to know that.
Best,
Congxian


David Magalhães  于2020年7月21日周二 下午11:42写道:

> Hi Congxian, the leftover files were on the local disk of the TaskManager.
> But looking better into the issue, I think the issue was the "logs". The
> sink, in this case, was writing one line into the logger (I was writing 8
> GB in total), and that makes more sense. So nothing wrong with the
> Flink/Savepoint behaviour.
>
> Thanks,
> David
>
> On Tue, Jul 21, 2020 at 12:37 PM Congxian Qiu 
> wrote:
>
>> Hi David
>>Sorry for the late reply, seems I missed your previous email.
>>I'm not sure I fully understand here, do the leftover files on s3
>> filesystem or the local disk of Taskmanager?. Currently, the savepoint data
>> will directly write to output stream of the underlying file(here is s3
>> file), you can have a look at the code here[1].
>>
>> [1]
>> https://github.com/apache/flink/blob/1908b2ce6ffb8efc7d339136787494b4fe70846f/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapSnapshotStrategy.java#L160
>>
>> Best,
>> Congxian
>>
>>
>> David Magalhães  于2020年7月21日周二 下午4:10写道:
>>
>>> Hi Till, I'm using s3:// schema, but not sure what was the default used
>>> if s3a or s3p.
>>>
>>> then the state backend should try to directly write to the target file
 system
>>>
>>>
>>> That was the behaviour that I saw the second time I've run this with
>>> more slots. Does the savepoint write directly to S3 via streaming or write
>>> the savepoint to memory first before sending to S3?
>>>
>>> Thanks,
>>> David
>>>
>>> On Tue, Jul 21, 2020 at 7:42 AM Till Rohrmann 
>>> wrote:
>>>
 Hi David,

 which S3 file system implementation are you using? If I'm not mistaken,
 then the state backend should try to directly write to the target file
 system. If this should result in temporary files on your TM, then this
 might be a problem of the file system implementation. Having access to the
 logs could also help to better understand whats going on.

 Cheers,
 Till

 On Tue, Jul 14, 2020 at 11:57 AM David Magalhães 
 wrote:

> Hi Congxian, sorry for the late reply.
>
> I'm using the filesystem with an S3 path as the default state backend
> in flink-conf.yml (state.backend: filesystem).
> The Flink version I'm using is 1.10.1.
>
> By "The task manager did not clean up the state", I mean what the
> taskmanager was writing on disk the savepoint file, but it didn't delete 
> it
> after the other taskmanager had an issue with the disk being full. The
> expected scenario would be both taskmanagers remove the savepoint they 
> were
> trying to do from the disk, but only the one that reached 100% disk space
> use did it.
>
> For my scenario, I'm using the Flink REST API to start/deploy jobs. A
> retained checkpoint isn't supported in REST API and even if it was, I 
> think
> it doesn't fit my scenario (stop a job, and start the new one from the
> saved state).
>
> Thanks,
> David
>
> On Sat, Jul 11, 2020 at 8:14 AM Congxian Qiu 
> wrote:
>
>> Hi David
>>
>> As you say the savepoint use local disk, I assume that you use
>> RocksDBStateBackend.
>> What's the flink version are you using now?
>>
>> What do you mean "The task manager did not clean up the state"?, does
>> that mean the local disk space did not  clean up, do the task encounter
>> failover in this period?
>>
>> The snapshot speed will be limited by the network bandwidth and the
>> local io performance.
>> IIUC, currently only checkpoint support local recovery
>>
>> PS: If you want the snapshot complete quickly, maybe you can try
>> retained checkpoint[1], and multiple threads uploads[2]
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/state/checkpoints.html#retained-checkpoints
>> [2] https://issues.apache.org/jira/browse/FLINK-11008
>>
>> Best,
>> Congxian
>>
>>
>> David Magalhães  于2020年7月10日周五 下午7:37写道:
>>
>>> Hi, yesterday when I was creating a savepoint (to S3, around 8GB of
>>> state) using 2 TaskManager (8 GB) and it failed because one of the task
>>> managers fill up the disk (probably didn't have enough RAM to save the
>>> state into S3 directly,I don't know what was the disk space, and reached
>>> 100% usage space and the other one reached 99%).
>>>
>>> After the crash, the task manager that reach 100% deleted the
>>> "failed savepoint" from the local disk but the other one that reached 
>>> 99%
>>> kept it. Shouldn't this task manager also clean up the failed state?
>>>
>>> After cleaning up the disk of that task manager, I've increased the
>>> parallelism to 6, created a new state of 8GB and all went smoothly, but 
>>> it
>>> took 8 minutes to start processing in the new

Re: MaxConnections understanding on FlinkKinesisProducer via KPL

2020-07-21 Thread Tzu-Li (Gordon) Tai
Hi Vijay,

I'm not entirely sure of the semantics between ThreadPoolSize and
MaxConnections since they are all KPL configurations (this specific
question would probably be better directed to AWS),
but my guess would be that the number of concurrent requests to the KPL
backend is capped by MaxConnections. This is per parallel
FlinkKinesisProducer subtask.

As for ThreadPoolSize, do note that the default threading model by KPL is
PER_REQUEST, for which the KPL native process will launch a thread for each
request.
Under heavy load, this would of course be an issue. Since you didn't
explicitly mention this config, make sure to set this to POOLED to actually
make use of a fixed thread pool for requests.

Overall, my suggestion is to set a reasonable queue limit for the number of
records buffered by KPL's native process (by default it is unbounded).
Without that in place, under high load you would easily be resource
exhausted, and can cause more unpredictable checkpointing times since the
FlinkKinesisProducer would need to flush pending records on checkpoints
(which ultimately also applies backpressure upstream).

BR,
Gordon

On Wed, Jul 22, 2020 at 5:21 AM Vijay Balakrishnan 
wrote:

> Hi,
> Trying to tune the KPL and FlinkKinesisProducer for Kinesis Data
> stream(KDS).
> Getting following errors:
> 1.
> Throttling
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
>
>  
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
> at
> org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.producer.KinesisProducer.addUserRecord(KinesisProducer.java:536)
>
> 2. ERROR
> org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.producer.LogInputStreamReader
>  - [2020-06-18 15:49:24.238655] [0x0ed6][0x7fc2086c8700] [error]
> [shard_map.cc:150] Shard map update for stream "_write" failed. Code: 
> *LimitExceededException
> Message: Rate exceeded for stream *..._write under account 753274046439.;
> retrying in 1500 ms
>
> 3. [AWS Log: ERROR](CurlHttpClient)*Curl returned error code 28*
>
>
>
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kinesis.html#backpressure
>
>
> https://github.com/awslabs/amazon-kinesis-producer/blob/master/java/amazon-kinesis-producer-sample/default_config.properties
>
>
> https://aws.amazon.com/premiumsupport/knowledge-center/kinesis-flink-timeout/
>
> These are the KPL property changes I am planning to make.
>
> *RequestTimeput*: 1 //default 6000 ms
>
> *AggregationEnabled*: true //default is true
>
> *ThreadPoolSize*: *15* //default 10
>
> *MaxConnections*: *48* //default 24 - this might have been a bottleneck
> when we flooded KPL with requests. Requests are sent in parallel over
> multiple connections to the backend.
>
> *RecordTtl*: *1* //default 3 ms  - drop record after 10s.
>
> *FailIfThrottled*: *true* //default false - so if throttled, don't retry.
>
>
> We were using parallelism for sinks at 80. So each corresponds to 1
> FlinkKinesisProducer. So, 80 * 10(ThreadPoolSize) = 800 threads.
> MaxConnections is 24 from KPL.
>
> I am not sure about the MaxConnections setting - what does 48 mean here
> -is it 40(sink parallelism) * 15(ThreadPoolSize) * 48 calls to the KDS
> backend via KPL ?
>
> Any thoughts on how not to overwhelm KPL while handling real time
> streaming load to the Kinesis via the FlinkKinesisProducer ?
>
> TIA,
>


Re: FlinkKinesisProducer blocking ?

2020-07-21 Thread Tzu-Li (Gordon) Tai
Hi Vijay,

ThreadPoolSize is for per Kinesis producer, which there is one for each
parallel subtask.
If you are constantly hitting the 1MB per second per shard quota, then the
records will be buffered by the FlinkKinesisProducer.
During this process, backpressure is not applied if you have not configured
an upper bound for the buffer queue.

One other thing to note, which might explain the backpresses at regular
intervals that you are experiencing,
is that the FlinkKinesisProducer needs to flush all pending records in the
buffer before the checkpoint can complete for the sink.
That would also apply backpressure upstream.

Gordon

On Fri, Jul 10, 2020 at 7:02 AM Vijay Balakrishnan 
wrote:

> Hi Gordon,
> ThreadPoolSize default is 10. I have parallelism of 80 spread out across
> 32 nodes.
> Could it be that the 80 threads get bottlenecked on a common ThreadPool of
> 10 or is it spawning 80 * 10 threads in total. The Flink TaskManagers run
> in separate slots/vCPUs and can be spread across 32 nodes in my case but
> occupying 80 slots/vCPUs. Is my understanding correct and will this be the
> reason that the KPL gets flooded with too many pending requests at regular
> intervals ??
>
> TIA,
>
> On Thu, Jul 9, 2020 at 12:15 PM Vijay Balakrishnan 
> wrote:
>
>> Thanks,Gordon for your reply.
>>
>> I do not set a queueLimit and so the default unbounded queueSize is 
>> 2147483647.
>> So, it should just be dropping records being produced from the
>> 80(parallelism) * 10 (ThreadPoolSize) = 800 threads based on Recordttl. I
>> do not want backpressure as you said it effectively blocks all upstream
>> operators.
>>
>> But from what you are saying, it will apply backpressure when the number
>> of outstanding records accumulated exceeds the default queue limit of 
>> 2147483647
>> or* does it also do it if it is r**ate-limited* *to 1MB per second per
>> shard by Kinesis* ? The 2nd case of Rate Limiting by Kinesis seems more
>> probable.
>>
>> So, calculating Queue Limit:
>> Based on this, my records size = 1600 bytes. I have 96 shards
>> Assuming - With the default RecordMaxBufferedTime of 100ms, a queue size
>> of 100kB per shard should be sufficient.So, Queue size/shard=100KB
>> Queue Limit with 96 shards = (96 * 10^5)/ 1600 = 6000
>> Queue Limit with 4 shards = (4 * 10^5)/ 1600 = 0.25
>>
>> Acc. to the docs:
>>
>> By default, FlinkKinesisProducer does not backpressure. Instead, records
>> that cannot be sent because of the rate restriction of 1 MB per second per
>> shard are buffered in an unbounded queue and dropped when their RecordTtl
>> expires.
>>
>> To avoid data loss, you can enable backpressuring by restricting the size
>> of the internal queue:
>>
>> // 200 Bytes per record, 1 shard
>> kinesis.setQueueLimit(500);
>>
>>
>> On Wed, Jul 8, 2020 at 12:20 AM Tzu-Li (Gordon) Tai 
>> wrote:
>>
>>> Hi Vijay,
>>>
>>> The FlinkKinesisProducer does not use blocking calls to the AWS KDS API.
>>> It does however apply backpressure (therefore effectively blocking all
>>> upstream operators) when the number of outstanding records accumulated
>>> exceeds a set limit, configured using the
>>> FlinkKinesisProducer#setQueueLimit
>>> method.
>>>
>>> For starters, you can maybe check if that was set appropriately.
>>>
>>> Cheers,
>>> Gordon
>>>
>>>
>>>
>>> --
>>> Sent from:
>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>>>
>>


Re: Flink SQL - Join Lookup Table

2020-07-21 Thread Jark Wu
Hi Kelly,

As a simple workaround, You can remove the watermark definition in
`KafkaStream`, in this way, the stream-stream join will not complain
"Rowtime attributes" exception.

Best,
Jark

On Wed, 22 Jul 2020 at 03:13, Kelly Smith  wrote:

> Thanks Leonard and Danny,
>
>
>
> This makes a lot of sense. My hope here is to only use SQL without any
> specialized Java/Scala code, so it seems it may not be possible to use
> either of these methods yet.
>
>
>
> I’ll open an issue for the LookupTableSource implementation, and look into
> the workaround you suggested in the short term.
>
>
>
> Thanks!
>
> Kelly
>
>
>
> *From: *Leonard Xu 
> *Date: *Monday, July 20, 2020 at 7:49 PM
> *To: *Danny Chan 
> *Cc: *Kelly Smith , Flink ML <
> user@flink.apache.org>
> *Subject: *Re: Flink SQL - Join Lookup Table
>
>
>
> Hi, kelly
>
>
>
> Looks like you want to use fact table(from Kafka) to join a dimension
> table(From filesystem),  dimension table is one kind of Temporal Table,
> temporal table join syntax you could refer Danny's post[1].
>
>
>
> But `FileSystemTableSource` did not implement `LookupTableSource`
> interface yet which means you can not use it as a dimension table, the
> connector that supported `LookupTableSource` includes JDBC、HBase、Hive,
>
> you can created an issue to support `lookupTableSource` for filesystem
> connector.
>
>
>
> Another approach is using Temporal Table Function[1] which can define a
> Temporal table from a dataStream, you can convert your Table(filesystem
> table) to stream and then create a temporal table and then join the
> temporal table.
>
>
>
>
>
> Best
>
> Leonard Xu
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/temporal_tables.html#defining-temporal-table
> 
>
> [2]
> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/temporal_tables.html#defining-temporal-table-function
> 
>
>
>
>
>
> 在 2020年7月21日,10:07,Danny Chan  写道:
>
>
>
> Seems you want a temporal table join instead of a two stream join, if that
> is your request, you should use syntax
>
>
>
> Join LookupTable FOR SYSTEM_TIME AS OF …
>
>
>
> See [1] for details.
>
>
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/joins.html#join-with-a-temporal-table
> 
>
>
>
> Best,
>
> Danny Chan
>
> 在 2020年7月21日 +0800 AM6:32,Kelly Smith ,写道:
>
> Hi folks,
>
>
>
> I have a question Flink SQL. What I want to do is this:
>
>
>
>- Join a simple lookup table (a few rows) to a stream of data to
>enrich the stream by adding a column from the lookup table.
>
>
>
>
>
> For example, a simple lookup table:
>
>
>
> *CREATE TABLE* LookupTable (
> *`computeClass` * STRING,
> *`multiplier`   *
> *FLOAT *) *WITH* (
> *'connector'* = *'filesystem'*,
> *'path'* = *'fpu-multipliers.csv'*,
> *'format'* =
> *'csv' *)
>
>
>
>
>
> And I’ve got a Kafka connector table with rowtime semantics that has a
> `computeClass` field. I simply want to join (in a streaming fashion) the
> `multiplier` field above.
>
>
>
>
> *SELECT*`timestamp`,
>
> // ...
> ks.computeClass,
> lt.`multiplier`
> *FROM *KafkaStream ks
>
> JOIN LookupTable lt ON ks.computeClass = lt.computeClass
>
>  Doing a simple join like that gives me this error:
>
>
>
> “org.apache.flink.table.api.TableException: Rowtime attributes must not be
> in the input rows of a regular join. As a workaround you can cast the time
> attributes of input tables to TIMESTAMP before.”
>
>
>
> Which leads me to believe that I should use an Interval Join instead, but
> that doesn’t seem to be appropriate since my table is static and has no
> concept of time. Basically, I want to hold the entire lookup table in
> memory, and simply enrich the Kafka stream (which need not be held in
> memory).
>
>
>
>

Re: Flink SQL - Join Lookup Table

2020-07-21 Thread Jingsong Li
Hi Kelly,

There are issues for tracking:
- Filesystem support single file reading:
https://issues.apache.org/jira/browse/FLINK-17398
- Filesystem support LookupJoin:
https://issues.apache.org/jira/browse/FLINK-17397

Best,
Jingsong

On Wed, Jul 22, 2020 at 3:13 AM Kelly Smith  wrote:

> Thanks Leonard and Danny,
>
>
>
> This makes a lot of sense. My hope here is to only use SQL without any
> specialized Java/Scala code, so it seems it may not be possible to use
> either of these methods yet.
>
>
>
> I’ll open an issue for the LookupTableSource implementation, and look into
> the workaround you suggested in the short term.
>
>
>
> Thanks!
>
> Kelly
>
>
>
> *From: *Leonard Xu 
> *Date: *Monday, July 20, 2020 at 7:49 PM
> *To: *Danny Chan 
> *Cc: *Kelly Smith , Flink ML <
> user@flink.apache.org>
> *Subject: *Re: Flink SQL - Join Lookup Table
>
>
>
> Hi, kelly
>
>
>
> Looks like you want to use fact table(from Kafka) to join a dimension
> table(From filesystem),  dimension table is one kind of Temporal Table,
> temporal table join syntax you could refer Danny's post[1].
>
>
>
> But `FileSystemTableSource` did not implement `LookupTableSource`
> interface yet which means you can not use it as a dimension table, the
> connector that supported `LookupTableSource` includes JDBC、HBase、Hive,
>
> you can created an issue to support `lookupTableSource` for filesystem
> connector.
>
>
>
> Another approach is using Temporal Table Function[1] which can define a
> Temporal table from a dataStream, you can convert your Table(filesystem
> table) to stream and then create a temporal table and then join the
> temporal table.
>
>
>
>
>
> Best
>
> Leonard Xu
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/temporal_tables.html#defining-temporal-table
> 
>
> [2]
> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/temporal_tables.html#defining-temporal-table-function
> 
>
>
>
>
>
> 在 2020年7月21日,10:07,Danny Chan  写道:
>
>
>
> Seems you want a temporal table join instead of a two stream join, if that
> is your request, you should use syntax
>
>
>
> Join LookupTable FOR SYSTEM_TIME AS OF …
>
>
>
> See [1] for details.
>
>
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/joins.html#join-with-a-temporal-table
> 
>
>
>
> Best,
>
> Danny Chan
>
> 在 2020年7月21日 +0800 AM6:32,Kelly Smith ,写道:
>
> Hi folks,
>
>
>
> I have a question Flink SQL. What I want to do is this:
>
>
>
>- Join a simple lookup table (a few rows) to a stream of data to
>enrich the stream by adding a column from the lookup table.
>
>
>
>
>
> For example, a simple lookup table:
>
>
>
> *CREATE TABLE* LookupTable (
> *`computeClass` * STRING,
> *`multiplier`   *
> *FLOAT *) *WITH* (
> *'connector'* = *'filesystem'*,
> *'path'* = *'fpu-multipliers.csv'*,
> *'format'* =
> *'csv' *)
>
>
>
>
>
> And I’ve got a Kafka connector table with rowtime semantics that has a
> `computeClass` field. I simply want to join (in a streaming fashion) the
> `multiplier` field above.
>
>
>
>
> *SELECT*`timestamp`,
>
> // ...
> ks.computeClass,
> lt.`multiplier`
> *FROM *KafkaStream ks
>
> JOIN LookupTable lt ON ks.computeClass = lt.computeClass
>
>  Doing a simple join like that gives me this error:
>
>
>
> “org.apache.flink.table.api.TableException: Rowtime attributes must not be
> in the input rows of a regular join. As a workaround you can cast the time
> attributes of input tables to TIMESTAMP before.”
>
>
>
> Which leads me to believe that I should use an Interval Join instead, but
> that doesn’t seem to be appropriate since my table is static and has no
> concept of time. Basically, I want to hold the entire lookup table in
> memory, and simply enrich the Kafka stream (

Strange stack when job encounter back pressure

2020-07-21 Thread aitozi
Hi,

I notice the job encounter a strange case: the upstream operator is
underpressure, all task in back pressure sample shows HIGH, but when i
jstack the downstream task, I only see the stack below:


   java.lang.Thread.State: WAITING (on object monitor)
at java.lang.Object.wait(Native Method)
at java.lang.Object.wait(Object.java:502)
at
org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:745)
- locked <0xb35221a8> (a java.util.ArrayDeque)
at
org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:704)
at
org.apache.flink.streaming.runtime.io.BarrierBuffer.getNext(BarrierBuffer.java:170)
at
org.apache.flink.streaming.runtime.io.BarrierBuffer.getNextNonBlocked(BarrierBuffer.java:162)
at
org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:269)
at
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:113)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:398)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:751)
at java.lang.Thread.run(Thread.java:882)


 

It likes that : the downstream say "i am waiting to data" but the upstream
say "I am blocked by the downstream".

Our code is based on 1.5.x

Is there any one also encounter this case or know the reason?





--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Validating my understanding of SHARD_DISCOVERY_INTERVAL_MILLIS

2020-07-21 Thread Tzu-Li (Gordon) Tai
Hi Vijay,

Your assumption is correct that the discovery interval does not affect the
interval of fetching records.

As a side note, you can actually disable shard discovery, by setting the
value to -1.
The FlinkKinesisProducer would then only call ListShards once at job
startup.

Cheers,
Gordon

On Fri, Jul 10, 2020 at 2:35 AM Vijay Balakrishnan 
wrote:

> Hi,
> I see these 2 constants- SHARD_GETRECORDS_INTERVAL_MILLIS &
> SHARD_DISCOVERY_INTERVAL_MILLIS.
>
> My understanding was SHARD_GETRECORDS_INTERVAL_MILLIS defines how often
> records are fetched from Kinesis Data Stream(KDS). Code seems to be doing
> this in ShardConsumer.run()-->getRecords()
>
> SHARD_DISCOVERY_INTERVAL_MILLIS defines how often the KinesisConsmer
> checks if there are any changes to shards. We don't change shards during
> our Application run.I have changed it to a very high value to avoid this
> check as I was running into ListShards issues with LimitExceedeException
> when using 282 shards
> Would this be a correct understanding of these 2 constants -especially the
> SHARD_DISCOVERY_INTERVAL_MILLIS
>
> My assumption that needs to be validated:
> The SHARD_DISCOVERY_INTERVAL_MILLIS should not affect the fetching of
> records as defined by SHARD_GETRECORDS_INTERVAL_MILLIS.
>
> Code below:
> kinesisConsumerConfig.setProperty(ConsumerConfigConstants.SHARD_GETRECORDS_INTERVAL_MILLIS,
> getRecsIntervalMs);//2000
>
> /*
> We do not change shards while the app is running.
> So, we can increase SHARD_DISCOVERY_INTERVAL_MILLIS to a very high value
> to avoid any rateLimiting issues from the AWS API with the ListShards call.
> Default is 10s. We can increase this to avoid this LimitExceededException
> as we don't change shards in the middle.
>  */
>
> kinesisConsumerConfig.setProperty(ConsumerConfigConstants.SHARD_DISCOVERY_INTERVAL_MILLIS,
> shardDiscoveryInterval);//1800 ms
>
>
> TIA,
>


Re: How to use a nested column for CREATE TABLE PARTITIONED BY

2020-07-21 Thread Danny Chan
You can not do that in Flink yet, Flink partition column must be mapped to 
columns from the table schema which you can select from. The syntax is a little 
different from Hive’s =>

create table table_name (
  idint,
  dtDontQuery   string,
  name  string
)
partitioned by (date string)

In which you can declare the partition column name & type at the same time.

Best,
Danny Chan
在 2020年7月21日 +0800 PM11:30,Dongwon Kim ,写道:
> Thanks Jark for the update.
>
> However, getting back to the original question, can I use a nested column 
> directly for CREATE TABLE PARTITIONED BY like below without declaring an 
> additional column?
>
> > CREATE TABLE output
> > PARTITIONED BY (`location.transId`)
> > WITH (
> >   'connector' = 'filesystem',
> >   'path' = 'east-out',
> >   'format' = 'json'
> > ) LIKE navi (EXCLUDING ALL)
>
> I tried (`location`.transId) as well but it fails with an exception:
> > Exception in thread "main" org.apache.flink.table.api.SqlParserException: 
> > SQL parse failed. Encountered "." at line 3, column 27.
> > Was expecting one of:
> >     ")" ...
> >     "," ...
> >
> > at 
> > org.apache.flink.table.planner.calcite.CalciteParser.parse(CalciteParser.java:56)
> > at 
> > org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:76)
> > at 
> > org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:678)
> > at com.kakaomobility.SQLExecutor.main(SQLExecutor.java:28)
> > Caused by: org.apache.calcite.sql.parser.SqlParseException: Encountered "." 
> > at line 3, column 27.
> > Was expecting one of:
> >     ")" ...
> >     "," ...
> >
> > at 
> > org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.convertException(FlinkSqlParserImpl.java:416)
> > at 
> > org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.normalizeException(FlinkSqlParserImpl.java:201)
> > at 
> > org.apache.calcite.sql.parser.SqlParser.handleException(SqlParser.java:148)
> > at org.apache.calcite.sql.parser.SqlParser.parseQuery(SqlParser.java:163)
> > at org.apache.calcite.sql.parser.SqlParser.parseStmt(SqlParser.java:188)
> > at 
> > org.apache.flink.table.planner.calcite.CalciteParser.parse(CalciteParser.java:54)
> > ... 3 more
> > Caused by: org.apache.flink.sql.parser.impl.ParseException: Encountered "." 
> > at line 3, column 27.
> > Was expecting one of:
> >     ")" ...
> >     "," ...
> >
> > at 
> > org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.generateParseException(FlinkSqlParserImpl.java:36086)
> > at 
> > org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.jj_consume_token(FlinkSqlParserImpl.java:35900)
> > at 
> > org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.ParenthesizedSimpleIdentifierList(FlinkSqlParserImpl.java:21398)
> > at 
> > org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SqlCreateTable(FlinkSqlParserImpl.java:5292)
> > at 
> > org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SqlCreateExtended(FlinkSqlParserImpl.java:6269)
> > at 
> > org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SqlCreate(FlinkSqlParserImpl.java:19047)
> > at 
> > org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SqlStmt(FlinkSqlParserImpl.java:3308)
> > at 
> > org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SqlStmtEof(FlinkSqlParserImpl.java:3800)
> > at 
> > org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.parseSqlStmtEof(FlinkSqlParserImpl.java:248)
> > at org.apache.calcite.sql.parser.SqlParser.parseQuery(SqlParser.java:161)
> > ... 5 more
>
> Best,
>
> Dongwon
>
> > On Wed, Jul 22, 2020 at 12:09 AM Jark Wu  wrote:
> > > Hi Dongwon,
> > >
> > > I think this is a bug in the Filesystem connector which doesn't exclude 
> > > the computed columns when building the TableSource.
> > > I created an issue [1] to track this problem.
> > >
> > > Best,
> > > Jark
> > >
> > > [1]: https://issues.apache.org/jira/browse/FLINK-18665
> > >
> > > > On Tue, 21 Jul 2020 at 17:31, Dongwon Kim  wrote:
> > > > > Hi Danny,
> > > > >
> > > > > >  Which version did you use
> > > > > I use Flink 1.11.0.
> > > > >
> > > > > >  what SQL context throws the error ?
> > > > > I think the declaration itself is not a problem.
> > > > > The exception occurs when I tried to execute the following which I 
> > > > > didn't show you in the previous email:
> > > > > > tEnv.sqlQuery("SELECT type, location FROM 
> > > > > > navi").executeInsert("output")
> > > > >
> > > > > Thanks,
> > > > >
> > > > > Dongwon
> > > > >
> > > > > > On Tue, Jul 21, 2020 at 6:16 PM Danny Chan  
> > > > > > wrote:
> > > > > > > Hi, I execute the sql below
> > > > > > >
> > > > > > > """
> > > > > > >|create table navi (
> > > > > > >|  a STRING,
> > > > > > >|  location ROW
> > > > > > >|) with (
> > > > > > >|  'connector' = 'filesystem',
> > > > > > >|  'path' = 'east-out',
> > > > > > >|  'format' = 'json'
> > > > > > >|)
> > > > > > >|""".stripMargin
> > > > > > > tableEnv.executeSql(sql0)
> > > > > > > val sql =
> > > > > > > """
> 

Re: Parquet batch table sink in Flink 1.11

2020-07-21 Thread Jingsong Li
In table/SQL,

I think we don't need a source/sink for `AvroParquetOutputFormat`, because
the data structure is always Row or RowData, should not be a avro object.

Best,
Jingsong

On Tue, Jul 21, 2020 at 8:09 PM Flavio Pompermaier 
wrote:

> This is what I actually do but I was hoping to be able to get rid of the
> HadoopOutputForma and be able to use a  more comfortable Source/Sink
> implementation.
>
> On Tue, Jul 21, 2020 at 12:38 PM Jingsong Li 
> wrote:
>
>> Hi Flavio,
>>
>> AvroOutputFormat only supports writing Avro files.
>> I think you can use `AvroParquetOutputFormat` as a hadoop output format,
>> and wrap it through Flink `HadoopOutputFormat`.
>>
>> Best,
>> Jingsong
>>
>> On Fri, Jul 17, 2020 at 11:59 PM Flavio Pompermaier 
>> wrote:
>>
>>> Hi to all,
>>> is there a way to write out Parquet-Avro data using
>>> BatchTableEnvironment with Flink 1.11?
>>> At the moment I'm using the hadoop ParquetOutputFormat but I hope to be
>>> able to get rid of it sooner or later..I saw that there's the
>>> AvroOutputFormat but no support for it using Parquet.
>>>
>>> Best,
>>> Flavio
>>>
>>
>>
>> --
>> Best, Jingsong Lee
>>
>

-- 
Best, Jingsong Lee


MaxConnections understanding on FlinkKinesisProducer via KPL

2020-07-21 Thread Vijay Balakrishnan
Hi,
Trying to tune the KPL and FlinkKinesisProducer for Kinesis Data
stream(KDS).
Getting following errors:
1.
Throttling
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
at
org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.producer.KinesisProducer.addUserRecord(KinesisProducer.java:536)

2. ERROR
org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.producer.LogInputStreamReader
 - [2020-06-18 15:49:24.238655] [0x0ed6][0x7fc2086c8700] [error]
[shard_map.cc:150] Shard map update for stream "_write" failed.
Code: *LimitExceededException
Message: Rate exceeded for stream *..._write under account 753274046439.;
retrying in 1500 ms

3. [AWS Log: ERROR](CurlHttpClient)*Curl returned error code 28*


https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kinesis.html#backpressure

https://github.com/awslabs/amazon-kinesis-producer/blob/master/java/amazon-kinesis-producer-sample/default_config.properties

https://aws.amazon.com/premiumsupport/knowledge-center/kinesis-flink-timeout/

These are the KPL property changes I am planning to make.

*RequestTimeput*: 1 //default 6000 ms

*AggregationEnabled*: true //default is true

*ThreadPoolSize*: *15* //default 10

*MaxConnections*: *48* //default 24 - this might have been a bottleneck
when we flooded KPL with requests. Requests are sent in parallel over
multiple connections to the backend.

*RecordTtl*: *1* //default 3 ms  - drop record after 10s.

*FailIfThrottled*: *true* //default false - so if throttled, don't retry.


We were using parallelism for sinks at 80. So each corresponds to 1
FlinkKinesisProducer. So, 80 * 10(ThreadPoolSize) = 800 threads.
MaxConnections is 24 from KPL.

I am not sure about the MaxConnections setting - what does 48 mean here -is
it 40(sink parallelism) * 15(ThreadPoolSize) * 48 calls to the KDS backend
via KPL ?

Any thoughts on how not to overwhelm KPL while handling real time streaming
load to the Kinesis via the FlinkKinesisProducer ?

TIA,


Re: Flink SQL - Join Lookup Table

2020-07-21 Thread Kelly Smith
Thanks Leonard and Danny,

This makes a lot of sense. My hope here is to only use SQL without any 
specialized Java/Scala code, so it seems it may not be possible to use either 
of these methods yet.

I’ll open an issue for the LookupTableSource implementation, and look into the 
workaround you suggested in the short term.

Thanks!
Kelly

From: Leonard Xu 
Date: Monday, July 20, 2020 at 7:49 PM
To: Danny Chan 
Cc: Kelly Smith , Flink ML 
Subject: Re: Flink SQL - Join Lookup Table

Hi, kelly

Looks like you want to use fact table(from Kafka) to join a dimension 
table(From filesystem),  dimension table is one kind of Temporal Table, 
temporal table join syntax you could refer Danny's post[1].

But `FileSystemTableSource` did not implement `LookupTableSource` interface yet 
which means you can not use it as a dimension table, the connector that 
supported `LookupTableSource` includes JDBC、HBase、Hive,
you can created an issue to support `lookupTableSource` for filesystem 
connector.

Another approach is using Temporal Table Function[1] which can define a 
Temporal table from a dataStream, you can convert your Table(filesystem table) 
to stream and then create a temporal table and then join the temporal table.


Best
Leonard Xu
[1] 
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/temporal_tables.html#defining-temporal-table
[2] 
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/temporal_tables.html#defining-temporal-table-function



在 2020年7月21日,10:07,Danny Chan 
mailto:yuzhao@gmail.com>> 写道:

Seems you want a temporal table join instead of a two stream join, if that is 
your request, you should use syntax

Join LookupTable FOR SYSTEM_TIME AS OF …

See [1] for details.

[1] 
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/joins.html#join-with-a-temporal-table

Best,
Danny Chan
在 2020年7月21日 +0800 AM6:32,Kelly Smith 
mailto:kell...@zillowgroup.com>>,写道:

Hi folks,

I have a question Flink SQL. What I want to do is this:


  *   Join a simple lookup table (a few rows) to a stream of data to enrich the 
stream by adding a column from the lookup table.


For example, a simple lookup table:

CREATE TABLE LookupTable (
`computeClass`  STRING,
`multiplier`FLOAT
) WITH (
'connector' = 'filesystem',
'path' = 'fpu-multipliers.csv',
'format' = 'csv'
)


And I’ve got a Kafka connector table with rowtime semantics that has a 
`computeClass` field. I simply want to join (in a streaming fashion) the 
`multiplier` field above.


SELECT
`timestamp`,

// ...
ks.computeClass,
lt.`multiplier`
FROM KafkaStream ks

JOIN LookupTable lt ON ks.computeClass = lt.computeClass


Doing a simple join like that gives me this error:

“org.apache.flink.table.api.TableException: Rowtime attributes must not be in 
the input rows of a regular join. As a workaround you can cast the time 
attributes of input tables to TIMESTAMP before.”

Which leads me to believe that I should use an Interval Join instead, but that 
doesn’t seem to be appropriate since my table is static and has no concept of 
time. Basically, I want to hold the entire lookup table in memory, and simply 
enrich the Kafka stream (which need not be held in memory).

Any ideas on how to accomplish what I’m trying to do?

Thanks!
Kelly



Re: Possible bug clean up savepoint dump into filesystem and low network IO starting from savepoint

2020-07-21 Thread David Magalhães
Hi Congxian, the leftover files were on the local disk of the TaskManager.
But looking better into the issue, I think the issue was the "logs". The
sink, in this case, was writing one line into the logger (I was writing 8
GB in total), and that makes more sense. So nothing wrong with the
Flink/Savepoint behaviour.

Thanks,
David

On Tue, Jul 21, 2020 at 12:37 PM Congxian Qiu 
wrote:

> Hi David
>Sorry for the late reply, seems I missed your previous email.
>I'm not sure I fully understand here, do the leftover files on s3
> filesystem or the local disk of Taskmanager?. Currently, the savepoint data
> will directly write to output stream of the underlying file(here is s3
> file), you can have a look at the code here[1].
>
> [1]
> https://github.com/apache/flink/blob/1908b2ce6ffb8efc7d339136787494b4fe70846f/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapSnapshotStrategy.java#L160
>
> Best,
> Congxian
>
>
> David Magalhães  于2020年7月21日周二 下午4:10写道:
>
>> Hi Till, I'm using s3:// schema, but not sure what was the default used
>> if s3a or s3p.
>>
>> then the state backend should try to directly write to the target file
>>> system
>>
>>
>> That was the behaviour that I saw the second time I've run this with more
>> slots. Does the savepoint write directly to S3 via streaming or write the
>> savepoint to memory first before sending to S3?
>>
>> Thanks,
>> David
>>
>> On Tue, Jul 21, 2020 at 7:42 AM Till Rohrmann 
>> wrote:
>>
>>> Hi David,
>>>
>>> which S3 file system implementation are you using? If I'm not mistaken,
>>> then the state backend should try to directly write to the target file
>>> system. If this should result in temporary files on your TM, then this
>>> might be a problem of the file system implementation. Having access to the
>>> logs could also help to better understand whats going on.
>>>
>>> Cheers,
>>> Till
>>>
>>> On Tue, Jul 14, 2020 at 11:57 AM David Magalhães 
>>> wrote:
>>>
 Hi Congxian, sorry for the late reply.

 I'm using the filesystem with an S3 path as the default state backend
 in flink-conf.yml (state.backend: filesystem).
 The Flink version I'm using is 1.10.1.

 By "The task manager did not clean up the state", I mean what the
 taskmanager was writing on disk the savepoint file, but it didn't delete it
 after the other taskmanager had an issue with the disk being full. The
 expected scenario would be both taskmanagers remove the savepoint they were
 trying to do from the disk, but only the one that reached 100% disk space
 use did it.

 For my scenario, I'm using the Flink REST API to start/deploy jobs. A
 retained checkpoint isn't supported in REST API and even if it was, I think
 it doesn't fit my scenario (stop a job, and start the new one from the
 saved state).

 Thanks,
 David

 On Sat, Jul 11, 2020 at 8:14 AM Congxian Qiu 
 wrote:

> Hi David
>
> As you say the savepoint use local disk, I assume that you use
> RocksDBStateBackend.
> What's the flink version are you using now?
>
> What do you mean "The task manager did not clean up the state"?, does
> that mean the local disk space did not  clean up, do the task encounter
> failover in this period?
>
> The snapshot speed will be limited by the network bandwidth and the
> local io performance.
> IIUC, currently only checkpoint support local recovery
>
> PS: If you want the snapshot complete quickly, maybe you can try
> retained checkpoint[1], and multiple threads uploads[2]
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/state/checkpoints.html#retained-checkpoints
> [2] https://issues.apache.org/jira/browse/FLINK-11008
>
> Best,
> Congxian
>
>
> David Magalhães  于2020年7月10日周五 下午7:37写道:
>
>> Hi, yesterday when I was creating a savepoint (to S3, around 8GB of
>> state) using 2 TaskManager (8 GB) and it failed because one of the task
>> managers fill up the disk (probably didn't have enough RAM to save the
>> state into S3 directly,I don't know what was the disk space, and reached
>> 100% usage space and the other one reached 99%).
>>
>> After the crash, the task manager that reach 100% deleted the "failed
>> savepoint" from the local disk but the other one that reached 99% kept 
>> it.
>> Shouldn't this task manager also clean up the failed state?
>>
>> After cleaning up the disk of that task manager, I've increased the
>> parallelism to 6, created a new state of 8GB and all went smoothly, but 
>> it
>> took 8 minutes to start processing in the new job created with the 
>> previous
>> savepoint.
>>
>> [image: flink_grafana.png]
>> Here is the network IO from the 6 task managers used and I have a few
>> questions:
>>
>> - Isn't 25 Mbps of average speed a bit low? What could be the
>>

Re: How to use a nested column for CREATE TABLE PARTITIONED BY

2020-07-21 Thread Dongwon Kim
Thanks Jark for the update.

However, getting back to the original question, can I use a nested column
directly for CREATE TABLE PARTITIONED BY like below without declaring an
additional column?

CREATE TABLE output
> PARTITIONED BY (`location.transId`)
> WITH (
>   'connector' = 'filesystem',
>   'path' = 'east-out',
>   'format' = 'json'
> ) LIKE navi (EXCLUDING ALL)
>

I tried (`location`.transId) as well but it fails with an exception:

> Exception in thread "main" org.apache.flink.table.api.SqlParserException:
> SQL parse failed. Encountered "." at line 3, column 27.
> Was expecting one of:
> ")" ...
> "," ...
>
> at
> org.apache.flink.table.planner.calcite.CalciteParser.parse(CalciteParser.java:56)
> at
> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:76)
> at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:678)
> at com.kakaomobility.SQLExecutor.main(SQLExecutor.java:28)
> Caused by: org.apache.calcite.sql.parser.SqlParseException: Encountered
> "." at line 3, column 27.
> Was expecting one of:
> ")" ...
> "," ...
>
> at
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.convertException(FlinkSqlParserImpl.java:416)
> at
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.normalizeException(FlinkSqlParserImpl.java:201)
> at
> org.apache.calcite.sql.parser.SqlParser.handleException(SqlParser.java:148)
> at org.apache.calcite.sql.parser.SqlParser.parseQuery(SqlParser.java:163)
> at org.apache.calcite.sql.parser.SqlParser.parseStmt(SqlParser.java:188)
> at
> org.apache.flink.table.planner.calcite.CalciteParser.parse(CalciteParser.java:54)
> ... 3 more
> Caused by: org.apache.flink.sql.parser.impl.ParseException: Encountered
> "." at line 3, column 27.
> Was expecting one of:
> ")" ...
> "," ...
>
> at
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.generateParseException(FlinkSqlParserImpl.java:36086)
> at
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.jj_consume_token(FlinkSqlParserImpl.java:35900)
> at
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.ParenthesizedSimpleIdentifierList(FlinkSqlParserImpl.java:21398)
> at
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SqlCreateTable(FlinkSqlParserImpl.java:5292)
> at
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SqlCreateExtended(FlinkSqlParserImpl.java:6269)
> at
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SqlCreate(FlinkSqlParserImpl.java:19047)
> at
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SqlStmt(FlinkSqlParserImpl.java:3308)
> at
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SqlStmtEof(FlinkSqlParserImpl.java:3800)
> at
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.parseSqlStmtEof(FlinkSqlParserImpl.java:248)
> at org.apache.calcite.sql.parser.SqlParser.parseQuery(SqlParser.java:161)
> ... 5 more


Best,

Dongwon

On Wed, Jul 22, 2020 at 12:09 AM Jark Wu  wrote:

> Hi Dongwon,
>
> I think this is a bug in the Filesystem connector which doesn't exclude
> the computed columns when building the TableSource.
> I created an issue [1] to track this problem.
>
> Best,
> Jark
>
> [1]: https://issues.apache.org/jira/browse/FLINK-18665
>
> On Tue, 21 Jul 2020 at 17:31, Dongwon Kim  wrote:
>
>> Hi Danny,
>>
>>  Which version did you use
>>
>> I use Flink 1.11.0.
>>
>>
>>>  what SQL context throws the error ?
>>
>> I think the declaration itself is not a problem.
>> The exception occurs when I tried to execute the following which I didn't
>> show you in the previous email:
>>
>>> tEnv.sqlQuery("SELECT type, location FROM navi").executeInsert("output")
>>
>>
>> Thanks,
>>
>> Dongwon
>>
>> On Tue, Jul 21, 2020 at 6:16 PM Danny Chan  wrote:
>>
>>> Hi, I execute the sql below
>>>
>>>   """
>>> |create table navi (
>>> |  a STRING,
>>> |  location ROW
>>> |) with (
>>> |  'connector' = 'filesystem',
>>> |  'path' = 'east-out',
>>> |  'format' = 'json'
>>> |)
>>> |""".stripMargin
>>> tableEnv.executeSql(sql0)
>>> val sql =
>>>   """
>>> |CREATE TABLE output (
>>> |  `partition` AS location.transId
>>> |) PARTITIONED BY (`partition`)
>>> |WITH (
>>> |  'connector' = 'filesystem',
>>> |  'path' = 'east-out',
>>> |  'format' = 'json'
>>> |) LIKE navi (EXCLUDING ALL)
>>> |""".stripMargin
>>> tableEnv.executeSql(sql)
>>>
>>>
>>> In master branch, both are correct, can you share you stack trace detail
>>> ? Which version did you use and what SQL context throws the error ?
>>>
>>> Best,
>>> Danny Chan
>>> 在 2020年7月21日 +0800 PM4:55,Dongwon Kim ,写道:
>>>
>>> Hi,
>>>
>>> I want to create subdirectories named after values of a nested column,
>>> location.transId.
>>>
>>> This is my first attempt:
>>>
 CREATE TABLE output
 PARTITIONED BY (`location.transId`)
 WITH (
   'connector' = 'filesystem',
   'path' = 'east-out',
   'format' = 'json'
 ) LIKE navi (EXCLUDING ALL)

>>>
>>> It fails with the followi

Re: How to use a nested column for CREATE TABLE PARTITIONED BY

2020-07-21 Thread Jark Wu
Hi Dongwon,

I think this is a bug in the Filesystem connector which doesn't exclude the
computed columns when building the TableSource.
I created an issue [1] to track this problem.

Best,
Jark

[1]: https://issues.apache.org/jira/browse/FLINK-18665

On Tue, 21 Jul 2020 at 17:31, Dongwon Kim  wrote:

> Hi Danny,
>
>  Which version did you use
>
> I use Flink 1.11.0.
>
>
>>  what SQL context throws the error ?
>
> I think the declaration itself is not a problem.
> The exception occurs when I tried to execute the following which I didn't
> show you in the previous email:
>
>> tEnv.sqlQuery("SELECT type, location FROM navi").executeInsert("output")
>
>
> Thanks,
>
> Dongwon
>
> On Tue, Jul 21, 2020 at 6:16 PM Danny Chan  wrote:
>
>> Hi, I execute the sql below
>>
>>   """
>> |create table navi (
>> |  a STRING,
>> |  location ROW
>> |) with (
>> |  'connector' = 'filesystem',
>> |  'path' = 'east-out',
>> |  'format' = 'json'
>> |)
>> |""".stripMargin
>> tableEnv.executeSql(sql0)
>> val sql =
>>   """
>> |CREATE TABLE output (
>> |  `partition` AS location.transId
>> |) PARTITIONED BY (`partition`)
>> |WITH (
>> |  'connector' = 'filesystem',
>> |  'path' = 'east-out',
>> |  'format' = 'json'
>> |) LIKE navi (EXCLUDING ALL)
>> |""".stripMargin
>> tableEnv.executeSql(sql)
>>
>>
>> In master branch, both are correct, can you share you stack trace detail
>> ? Which version did you use and what SQL context throws the error ?
>>
>> Best,
>> Danny Chan
>> 在 2020年7月21日 +0800 PM4:55,Dongwon Kim ,写道:
>>
>> Hi,
>>
>> I want to create subdirectories named after values of a nested column,
>> location.transId.
>>
>> This is my first attempt:
>>
>>> CREATE TABLE output
>>> PARTITIONED BY (`location.transId`)
>>> WITH (
>>>   'connector' = 'filesystem',
>>>   'path' = 'east-out',
>>>   'format' = 'json'
>>> ) LIKE navi (EXCLUDING ALL)
>>>
>>
>> It fails with the following errors:
>>
>>> Exception in thread "main"
>>> org.apache.flink.table.api.ValidationException: Partition column
>>> 'location.transId' not defined in the table schema. Available columns:
>>> ['type', 'location']
>>> at
>>> org.apache.flink.table.planner.operations.SqlCreateTableConverter.verifyPartitioningColumnsExist(SqlCreateTableConverter.java:164)
>>> at
>>> org.apache.flink.table.planner.operations.SqlCreateTableConverter.createCatalogTable(SqlCreateTableConverter.java:130)
>>> at
>>> org.apache.flink.table.planner.operations.SqlCreateTableConverter.convertCreateTable(SqlCreateTableConverter.java:76)
>>> at
>>> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:190)
>>> at
>>> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:78)
>>> at
>>> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:678)
>>> at com.kakaomobility.SQLExecutor.main(SQLExecutor.java:28)
>>>
>>
>> As It seems like nested columns are not recognized as a eligible column
>> for PARTITIONED BY, I tried the following:
>>
>>> CREATE TABLE output (
>>>   `partition` AS location.transId
>>> ) PARTITIONED BY (`partition`)
>>> WITH (
>>>   'connector' = 'filesystem',
>>>   'path' = 'east-out',
>>>   'format' = 'json'
>>> ) LIKE navi (EXCLUDING ALL)
>>>
>> It also fails:
>>
>>>  Exception in thread "main"
>>> org.apache.flink.table.api.ValidationException: The field count of logical
>>> schema of the table does not match with the field count of physical schema
>>
>> . The logical schema: [STRING,ROW<`lastUpdateTime` BIGINT, `transId`
>> STRING>]
>> The physical schema: [STRING,ROW<`lastUpdateTime` BIGINT, `transId`
>> STRING>,STRING].
>>
>> Thanks in advance,
>>
>> Dongwon
>>
>>


Re: [ANNOUNCE] Apache Flink 1.11.0 released

2020-07-21 Thread Till Rohrmann
Thanks a lot for being our release managers Zhijiang and Piotr and thanks
to everyone who helped making this release possible.

Cheers,
Till

On Wed, Jul 8, 2020 at 10:59 AM godfrey he  wrote:

> Congratulations!
>
> Thanks Zhijiang and Piotr for the great work, and thanks everyone for
> their contribution!
>
> Best,
> Godfrey
>
> Benchao Li  于2020年7月8日周三 下午12:39写道:
>
>> Congratulations!  Thanks Zhijiang & Piotr for the great work as release
>> managers.
>>
>> Rui Li  于2020年7月8日周三 上午11:38写道:
>>
>>> Congratulations! Thanks Zhijiang & Piotr for the hard work.
>>>
>>> On Tue, Jul 7, 2020 at 10:06 PM Zhijiang 
>>> wrote:
>>>
 The Apache Flink community is very happy to announce the release of
 Apache Flink 1.11.0, which is the latest major release.

 Apache Flink® is an open-source stream processing framework for 
 distributed,
 high-performing, always-available, and accurate data streaming
 applications.

 The release is available for download at:
 https://flink.apache.org/downloads.html

 Please check out the release blog post for an overview of the
 improvements for this new major release:
 https://flink.apache.org/news/2020/07/06/release-1.11.0.html

 The full release notes are available in Jira:

 https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522&version=12346364

 We would like to thank all contributors of the Apache Flink community
 who made this release possible!

 Cheers,
 Piotr & Zhijiang

>>>
>>>
>>> --
>>> Best regards!
>>> Rui Li
>>>
>>
>>
>> --
>>
>> Best,
>> Benchao Li
>>
>


Re: Parquet batch table sink in Flink 1.11

2020-07-21 Thread Flavio Pompermaier
This is what I actually do but I was hoping to be able to get rid of the
HadoopOutputForma and be able to use a  more comfortable Source/Sink
implementation.

On Tue, Jul 21, 2020 at 12:38 PM Jingsong Li  wrote:

> Hi Flavio,
>
> AvroOutputFormat only supports writing Avro files.
> I think you can use `AvroParquetOutputFormat` as a hadoop output format,
> and wrap it through Flink `HadoopOutputFormat`.
>
> Best,
> Jingsong
>
> On Fri, Jul 17, 2020 at 11:59 PM Flavio Pompermaier 
> wrote:
>
>> Hi to all,
>> is there a way to write out Parquet-Avro data using BatchTableEnvironment
>> with Flink 1.11?
>> At the moment I'm using the hadoop ParquetOutputFormat but I hope to be
>> able to get rid of it sooner or later..I saw that there's the
>> AvroOutputFormat but no support for it using Parquet.
>>
>> Best,
>> Flavio
>>
>
>
> --
> Best, Jingsong Lee
>


Re: FileNotFoundException when writting Hive orc tables

2020-07-21 Thread Paul Lam
Thanks for your help anyway, Jingsong & Rui.

I read the jira description, and I’m +1 to check the lazy initiation first. It 
looks like the file creation is skipped or it doesn’t block the writing, and 
I’ve seen a bucket was writing to a file that was not supposed to exist, e.g. 
its parent dir was not created yet. 

Best,
Paul Lam

> 2020年7月21日 18:50,Jingsong Li  写道:
> 
> Hi,
> 
> Sorry for this. This work around only works in Hive 2+.
> We can only wait for 1.11.2.
> 
> Best,
> Jingsong
> 
> On Tue, Jul 21, 2020 at 6:15 PM Rui Li  > wrote:
> Hi Paul,
> 
> I believe Jingsong meant try using native writer, for which the option key is 
> `table.exec.hive.fallback-mapred-writer` and is by default set to true.
> You can set it to false like this: 
> tableEnv.getConfig().getConfiguration().set( 
> HiveOptions.TABLE_EXEC_HIVE_FALLBACK_MAPRED_WRITER, false)
> 
> On Tue, Jul 21, 2020 at 6:07 PM Paul Lam  > wrote:
> Hi JingSong,
> 
> Thanks for your advice! But IIUC, it seems that 
> `table.exec.hive.fallback-mapred-reader` is false by default? 
> 
> Moreover, explicitly setting this option might cause a serialization issue. 
> Wonder if I’m setting it in the right way? 
> 
> ```
> tableEnv.getConfig().getConfiguration().setString("table.exec.hive.fallback-mapred-writer",
>  "false”);
> ```
> 
> The error it caused:
> 
> Caused by: org.apache.flink.table.api.TableException: Failed to execute sql
>   at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:715)
>   at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeOperation(TableEnvironmentImpl.java:781)
>   at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:684)
>   at com.my.package.class(JobEntry.java:65)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288)
>   ... 11 more
> Caused by: org.apache.flink.streaming.runtime.tasks.StreamTaskException: 
> Cannot serialize operator object class 
> org.apache.flink.streaming.api.operators.SimpleOperatorFactory.
>   at 
> org.apache.flink.streaming.api.graph.StreamConfig.setStreamOperatorFactory(StreamConfig.java:263)
>   at 
> org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.setVertexConfig(StreamingJobGraphGenerator.java:495)
>   at 
> org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createChain(StreamingJobGraphGenerator.java:314)
>   at 
> org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createChain(StreamingJobGraphGenerator.java:288)
>   at 
> org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createChain(StreamingJobGraphGenerator.java:288)
>   at 
> org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createChain(StreamingJobGraphGenerator.java:288)
>   at 
> org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.setChaining(StreamingJobGraphGenerator.java:260)
>   at 
> org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createJobGraph(StreamingJobGraphGenerator.java:169)
>   at 
> org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createJobGraph(StreamingJobGraphGenerator.java:109)
>   at 
> org.apache.flink.streaming.api.graph.StreamGraph.getJobGraph(StreamGraph.java:850)
>   at 
> org.apache.flink.client.StreamGraphTranslator.translateToJobGraph(StreamGraphTranslator.java:52)
>   at 
> org.apache.flink.client.FlinkPipelineTranslationUtil.getJobGraph(FlinkPipelineTranslationUtil.java:43)
>   at 
> org.apache.flink.client.deployment.executors.PipelineExecutorUtils.getJobGraph(PipelineExecutorUtils.java:55)
>   at 
> org.apache.flink.client.deployment.executors.AbstractJobClusterExecutor.execute(AbstractJobClusterExecutor.java:62)
>   at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:1812)
>   at 
> org.apache.flink.client.program.StreamContextEnvironment.executeAsync(StreamContextEnvironment.java:128)
>   at 
> org.apache.flink.table.planner.delegation.ExecutorBase.executeAsync(ExecutorBase.java:57)
>   at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:699)
>   ... 19 more
> Caused by: java.io.NotSerializableException: 
> org.apache.hadoop.conf.Configuration
>   at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
>   at 
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
>   at 
> java.io.Obj

Re: Possible bug clean up savepoint dump into filesystem and low network IO starting from savepoint

2020-07-21 Thread Congxian Qiu
Hi David
   Sorry for the late reply, seems I missed your previous email.
   I'm not sure I fully understand here, do the leftover files on s3
filesystem or the local disk of Taskmanager?. Currently, the savepoint data
will directly write to output stream of the underlying file(here is s3
file), you can have a look at the code here[1].

[1]
https://github.com/apache/flink/blob/1908b2ce6ffb8efc7d339136787494b4fe70846f/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapSnapshotStrategy.java#L160

Best,
Congxian


David Magalhães  于2020年7月21日周二 下午4:10写道:

> Hi Till, I'm using s3:// schema, but not sure what was the default used if
> s3a or s3p.
>
> then the state backend should try to directly write to the target file
>> system
>
>
> That was the behaviour that I saw the second time I've run this with more
> slots. Does the savepoint write directly to S3 via streaming or write the
> savepoint to memory first before sending to S3?
>
> Thanks,
> David
>
> On Tue, Jul 21, 2020 at 7:42 AM Till Rohrmann 
> wrote:
>
>> Hi David,
>>
>> which S3 file system implementation are you using? If I'm not mistaken,
>> then the state backend should try to directly write to the target file
>> system. If this should result in temporary files on your TM, then this
>> might be a problem of the file system implementation. Having access to the
>> logs could also help to better understand whats going on.
>>
>> Cheers,
>> Till
>>
>> On Tue, Jul 14, 2020 at 11:57 AM David Magalhães 
>> wrote:
>>
>>> Hi Congxian, sorry for the late reply.
>>>
>>> I'm using the filesystem with an S3 path as the default state backend in
>>> flink-conf.yml (state.backend: filesystem).
>>> The Flink version I'm using is 1.10.1.
>>>
>>> By "The task manager did not clean up the state", I mean what the
>>> taskmanager was writing on disk the savepoint file, but it didn't delete it
>>> after the other taskmanager had an issue with the disk being full. The
>>> expected scenario would be both taskmanagers remove the savepoint they were
>>> trying to do from the disk, but only the one that reached 100% disk space
>>> use did it.
>>>
>>> For my scenario, I'm using the Flink REST API to start/deploy jobs. A
>>> retained checkpoint isn't supported in REST API and even if it was, I think
>>> it doesn't fit my scenario (stop a job, and start the new one from the
>>> saved state).
>>>
>>> Thanks,
>>> David
>>>
>>> On Sat, Jul 11, 2020 at 8:14 AM Congxian Qiu 
>>> wrote:
>>>
 Hi David

 As you say the savepoint use local disk, I assume that you use
 RocksDBStateBackend.
 What's the flink version are you using now?

 What do you mean "The task manager did not clean up the state"?, does
 that mean the local disk space did not  clean up, do the task encounter
 failover in this period?

 The snapshot speed will be limited by the network bandwidth and the
 local io performance.
 IIUC, currently only checkpoint support local recovery

 PS: If you want the snapshot complete quickly, maybe you can try
 retained checkpoint[1], and multiple threads uploads[2]
 [1]
 https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/state/checkpoints.html#retained-checkpoints
 [2] https://issues.apache.org/jira/browse/FLINK-11008

 Best,
 Congxian


 David Magalhães  于2020年7月10日周五 下午7:37写道:

> Hi, yesterday when I was creating a savepoint (to S3, around 8GB of
> state) using 2 TaskManager (8 GB) and it failed because one of the task
> managers fill up the disk (probably didn't have enough RAM to save the
> state into S3 directly,I don't know what was the disk space, and reached
> 100% usage space and the other one reached 99%).
>
> After the crash, the task manager that reach 100% deleted the "failed
> savepoint" from the local disk but the other one that reached 99% kept it.
> Shouldn't this task manager also clean up the failed state?
>
> After cleaning up the disk of that task manager, I've increased the
> parallelism to 6, created a new state of 8GB and all went smoothly, but it
> took 8 minutes to start processing in the new job created with the 
> previous
> savepoint.
>
> [image: flink_grafana.png]
> Here is the network IO from the 6 task managers used and I have a few
> questions:
>
> - Isn't 25 Mbps of average speed a bit low? What could be the
> limitation?
> - For 8 GB of state, gives around 7 minutes to download it [ 8000 MB
> /(25Mbps/8*6 task managers)/60 seconds ], that should match the consistent
> part of 7/8 minute graph, and then started reading from Kafka topic.
> - Can I mitigate this with task local recovery [1]? Or is this only
> for a checkpoint ?
> - We are using *m5.xlarge* (4 vcpu, 16GB RAM) with 2 slots per TM.
>
> Thanks,
> David
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/ops/stat

Re: Unsubscribe

2020-07-21 Thread Yangze Guo
Hi Harshvardhan,

You need to send an email to user-unsubscr...@flink.apache.org to unsubscribe.

Best,
Yangze Guo

On Tue, Jul 21, 2020 at 7:12 PM Harshvardhan Agrawal
 wrote:

>
> --
> Regards,
> Harshvardhan


Unsubscribe

2020-07-21 Thread Harshvardhan Agrawal
-- 
Regards,
Harshvardhan


Re: FileNotFoundException when writting Hive orc tables

2020-07-21 Thread Jingsong Li
Hi,

Sorry for this. This work around only works in Hive 2+.
We can only wait for 1.11.2.

Best,
Jingsong

On Tue, Jul 21, 2020 at 6:15 PM Rui Li  wrote:

> Hi Paul,
>
> I believe Jingsong meant try using native writer, for which the option key
> is `table.exec.hive.fallback-mapred-writer` and is by default set to true.
> You can set it to false like
> this: tableEnv.getConfig().getConfiguration().set(
> HiveOptions.TABLE_EXEC_HIVE_FALLBACK_MAPRED_WRITER, false)
>
> On Tue, Jul 21, 2020 at 6:07 PM Paul Lam  wrote:
>
>> Hi JingSong,
>>
>> Thanks for your advice! But IIUC, it seems
>> that `table.exec.hive.fallback-mapred-reader` is false by default?
>>
>> Moreover, explicitly setting this option might cause a serialization
>> issue. Wonder if I’m setting it in the right way?
>>
>> ```
>>
>> tableEnv.getConfig().getConfiguration().setString("table.exec.hive.fallback-mapred-writer",
>>  "false”);
>>
>> ```
>>
>> The error it caused:
>>
>> Caused by: org.apache.flink.table.api.TableException: Failed to execute sql
>>  at 
>> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:715)
>>  at 
>> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeOperation(TableEnvironmentImpl.java:781)
>>  at 
>> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:684)
>>  at com.my.package.class(JobEntry.java:65)
>>  at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>  at 
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>  at 
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>  at java.lang.reflect.Method.invoke(Method.java:498)
>>  at 
>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288)
>>  ... 11 more
>> Caused by: org.apache.flink.streaming.runtime.tasks.StreamTaskException: 
>> Cannot serialize operator object class 
>> org.apache.flink.streaming.api.operators.SimpleOperatorFactory.
>>  at 
>> org.apache.flink.streaming.api.graph.StreamConfig.setStreamOperatorFactory(StreamConfig.java:263)
>>  at 
>> org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.setVertexConfig(StreamingJobGraphGenerator.java:495)
>>  at 
>> org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createChain(StreamingJobGraphGenerator.java:314)
>>  at 
>> org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createChain(StreamingJobGraphGenerator.java:288)
>>  at 
>> org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createChain(StreamingJobGraphGenerator.java:288)
>>  at 
>> org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createChain(StreamingJobGraphGenerator.java:288)
>>  at 
>> org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.setChaining(StreamingJobGraphGenerator.java:260)
>>  at 
>> org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createJobGraph(StreamingJobGraphGenerator.java:169)
>>  at 
>> org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createJobGraph(StreamingJobGraphGenerator.java:109)
>>  at 
>> org.apache.flink.streaming.api.graph.StreamGraph.getJobGraph(StreamGraph.java:850)
>>  at 
>> org.apache.flink.client.StreamGraphTranslator.translateToJobGraph(StreamGraphTranslator.java:52)
>>  at 
>> org.apache.flink.client.FlinkPipelineTranslationUtil.getJobGraph(FlinkPipelineTranslationUtil.java:43)
>>  at 
>> org.apache.flink.client.deployment.executors.PipelineExecutorUtils.getJobGraph(PipelineExecutorUtils.java:55)
>>  at 
>> org.apache.flink.client.deployment.executors.AbstractJobClusterExecutor.execute(AbstractJobClusterExecutor.java:62)
>>  at 
>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:1812)
>>  at 
>> org.apache.flink.client.program.StreamContextEnvironment.executeAsync(StreamContextEnvironment.java:128)
>>  at 
>> org.apache.flink.table.planner.delegation.ExecutorBase.executeAsync(ExecutorBase.java:57)
>>  at 
>> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:699)
>>  ... 19 more
>> Caused by: java.io.NotSerializableException: 
>> org.apache.hadoop.conf.Configuration
>>  at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
>>  at 
>> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
>>  at 
>> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
>>  at 
>> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
>>  at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
>>  at 
>> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
>>  at 
>> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
>>

Re: Beam Word Cound on Flink/kubernetes Issue with task manager ot getting picked up

2020-07-21 Thread Yang Wang
Hi Avijit Saha,

I think you could use 'kubectl describe pod
flink-task-manager-5cc79c5795-7mnqh' to get more information.
Usually, it is caused by no enough resource in your K8s cluster.


Best,
Yang

Avijit Saha  于2020年7月14日周二 上午7:12写道:

> Hi,
>
> I have a docker image of the Beam WordCount example that reads a
> status file and produces a output one time with word counts etc.
>
> This runs fine as a separate job-manager and task-manager when run from
> docker-compose locally.
>
> Now, I am trying to deploy and run this on my Kubernetes cluster as per
> instructions at
> https://github.com/apache/flink/tree/release-1.10/flink-container/kubernetes
> .
>
> Deployment of Job-cluster and task-manager goes thro fine but the
> task-manager never seems to be picked up - it always stays in 'Pending'
> status!
> Is this expected behavior for a one-time Job like Word-Count application
> or am I missing something?
>
> Thanks
> Avijit
>
> $ kubectl get pods
> NAME  READY   STATUS
>  RESTARTS   AGE
> flink-job-cluster-kw85v   2/2   Running   2
>  15m
> flink-task-manager-5cc79c5795-7mnqh   0/2 Pending   0  14m
>


Re: Key group is not in KeyGroupRange

2020-07-21 Thread Ori Popowski
I should have mentioned, I've opened a bug for it
https://issues.apache.org/jira/browse/FLINK-18637. So the discussion moved
there.

On Tue, Jul 14, 2020 at 2:03 PM Ori Popowski  wrote:

> I'm getting this error when creating a savepoint. I've read in
> https://issues.apache.org/jira/browse/FLINK-16193 that it's caused by
> unstable hashcode or equals on the key, or improper use of
> reinterpretAsKeyedStream.
>
> My key is a string and I don't use reinterpretAsKeyedStream, so what's
> going on?
>
> org.apache.flink.util.FlinkException: Triggering a savepoint for the job
> 962fc8e984e7ca1ed65a038aa62ce124 failed.
> at
> org.apache.flink.client.cli.CliFrontend.triggerSavepoint(CliFrontend.java:633)
> at
> org.apache.flink.client.cli.CliFrontend.lambda$savepoint$9(CliFrontend.java:611)
> at
> org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:843)
> at org.apache.flink.client.cli.CliFrontend.savepoint(CliFrontend.java:608)
> at
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:910)
> at
> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968)
> at java.security.AccessController.doPrivileged(Native Method)
> at javax.security.auth.Subject.doAs(Subject.java:422)
> at
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1844)
> at
> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
> at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968)
> Caused by: java.util.concurrent.CompletionException:
> java.util.concurrent.CompletionException:
> org.apache.flink.runtime.checkpoint.CheckpointException: The job has failed.
> at
> org.apache.flink.runtime.scheduler.SchedulerBase.lambda$triggerSavepoint$3(SchedulerBase.java:744)
> at
> java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:822)
> at
> java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:797)
> at
> java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:397)
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:190)
> at
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
> at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
> at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
> at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
> at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
> at akka.actor.ActorCell.invoke(ActorCell.scala:561)
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
> at akka.dispatch.Mailbox.run(Mailbox.scala:225)
> at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
> at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at
> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> at
> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: java.util.concurrent.CompletionException:
> org.apache.flink.runtime.checkpoint.CheckpointException: The job has failed.
> at
> java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)
> at
> java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308)
> at
> java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:593)
> at
> java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577)
> at
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
> at
> java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
> at
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.lambda$null$1(CheckpointCoordinator.java:457)
> at
> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
> at
> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
> at
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
> at
> java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
> at
> org.apache.flink.runtime.checkpoint.PendingCheckpoint.abort(PendingCheckpoint.java:429)
> at
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.failPendingChec

Re: Parquet batch table sink in Flink 1.11

2020-07-21 Thread Jingsong Li
Hi Flavio,

AvroOutputFormat only supports writing Avro files.
I think you can use `AvroParquetOutputFormat` as a hadoop output format,
and wrap it through Flink `HadoopOutputFormat`.

Best,
Jingsong

On Fri, Jul 17, 2020 at 11:59 PM Flavio Pompermaier 
wrote:

> Hi to all,
> is there a way to write out Parquet-Avro data using BatchTableEnvironment
> with Flink 1.11?
> At the moment I'm using the hadoop ParquetOutputFormat but I hope to be
> able to get rid of it sooner or later..I saw that there's the
> AvroOutputFormat but no support for it using Parquet.
>
> Best,
> Flavio
>


-- 
Best, Jingsong Lee


Re: FileNotFoundException when writting Hive orc tables

2020-07-21 Thread Rui Li
Hi Paul,

I believe Jingsong meant try using native writer, for which the option key
is `table.exec.hive.fallback-mapred-writer` and is by default set to true.
You can set it to false like
this: tableEnv.getConfig().getConfiguration().set(
HiveOptions.TABLE_EXEC_HIVE_FALLBACK_MAPRED_WRITER, false)

On Tue, Jul 21, 2020 at 6:07 PM Paul Lam  wrote:

> Hi JingSong,
>
> Thanks for your advice! But IIUC, it seems
> that `table.exec.hive.fallback-mapred-reader` is false by default?
>
> Moreover, explicitly setting this option might cause a serialization
> issue. Wonder if I’m setting it in the right way?
>
> ```
>
> tableEnv.getConfig().getConfiguration().setString("table.exec.hive.fallback-mapred-writer",
>  "false”);
>
> ```
>
> The error it caused:
>
> Caused by: org.apache.flink.table.api.TableException: Failed to execute sql
>   at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:715)
>   at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeOperation(TableEnvironmentImpl.java:781)
>   at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:684)
>   at com.my.package.class(JobEntry.java:65)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288)
>   ... 11 more
> Caused by: org.apache.flink.streaming.runtime.tasks.StreamTaskException: 
> Cannot serialize operator object class 
> org.apache.flink.streaming.api.operators.SimpleOperatorFactory.
>   at 
> org.apache.flink.streaming.api.graph.StreamConfig.setStreamOperatorFactory(StreamConfig.java:263)
>   at 
> org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.setVertexConfig(StreamingJobGraphGenerator.java:495)
>   at 
> org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createChain(StreamingJobGraphGenerator.java:314)
>   at 
> org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createChain(StreamingJobGraphGenerator.java:288)
>   at 
> org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createChain(StreamingJobGraphGenerator.java:288)
>   at 
> org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createChain(StreamingJobGraphGenerator.java:288)
>   at 
> org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.setChaining(StreamingJobGraphGenerator.java:260)
>   at 
> org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createJobGraph(StreamingJobGraphGenerator.java:169)
>   at 
> org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createJobGraph(StreamingJobGraphGenerator.java:109)
>   at 
> org.apache.flink.streaming.api.graph.StreamGraph.getJobGraph(StreamGraph.java:850)
>   at 
> org.apache.flink.client.StreamGraphTranslator.translateToJobGraph(StreamGraphTranslator.java:52)
>   at 
> org.apache.flink.client.FlinkPipelineTranslationUtil.getJobGraph(FlinkPipelineTranslationUtil.java:43)
>   at 
> org.apache.flink.client.deployment.executors.PipelineExecutorUtils.getJobGraph(PipelineExecutorUtils.java:55)
>   at 
> org.apache.flink.client.deployment.executors.AbstractJobClusterExecutor.execute(AbstractJobClusterExecutor.java:62)
>   at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:1812)
>   at 
> org.apache.flink.client.program.StreamContextEnvironment.executeAsync(StreamContextEnvironment.java:128)
>   at 
> org.apache.flink.table.planner.delegation.ExecutorBase.executeAsync(ExecutorBase.java:57)
>   at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:699)
>   ... 19 more
> Caused by: java.io.NotSerializableException: 
> org.apache.hadoop.conf.Configuration
>   at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
>   at 
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
>   at 
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
>   at 
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
>   at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
>   at 
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
>   at 
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
>   at 
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
>   at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
>   at 
> java.io.ObjectOutputStream.defaultWriteF

Re: FileNotFoundException when writting Hive orc tables

2020-07-21 Thread Paul Lam
Hi JingSong,

Thanks for your advice! But IIUC, it seems that 
`table.exec.hive.fallback-mapred-reader` is false by default? 

Moreover, explicitly setting this option might cause a serialization issue. 
Wonder if I’m setting it in the right way? 

```
tableEnv.getConfig().getConfiguration().setString("table.exec.hive.fallback-mapred-writer",
 "false”);
```

The error it caused:

Caused by: org.apache.flink.table.api.TableException: Failed to execute sql
at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:715)
at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeOperation(TableEnvironmentImpl.java:781)
at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:684)
at com.my.package.class(JobEntry.java:65)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288)
... 11 more
Caused by: org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot 
serialize operator object class 
org.apache.flink.streaming.api.operators.SimpleOperatorFactory.
at 
org.apache.flink.streaming.api.graph.StreamConfig.setStreamOperatorFactory(StreamConfig.java:263)
at 
org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.setVertexConfig(StreamingJobGraphGenerator.java:495)
at 
org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createChain(StreamingJobGraphGenerator.java:314)
at 
org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createChain(StreamingJobGraphGenerator.java:288)
at 
org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createChain(StreamingJobGraphGenerator.java:288)
at 
org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createChain(StreamingJobGraphGenerator.java:288)
at 
org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.setChaining(StreamingJobGraphGenerator.java:260)
at 
org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createJobGraph(StreamingJobGraphGenerator.java:169)
at 
org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createJobGraph(StreamingJobGraphGenerator.java:109)
at 
org.apache.flink.streaming.api.graph.StreamGraph.getJobGraph(StreamGraph.java:850)
at 
org.apache.flink.client.StreamGraphTranslator.translateToJobGraph(StreamGraphTranslator.java:52)
at 
org.apache.flink.client.FlinkPipelineTranslationUtil.getJobGraph(FlinkPipelineTranslationUtil.java:43)
at 
org.apache.flink.client.deployment.executors.PipelineExecutorUtils.getJobGraph(PipelineExecutorUtils.java:55)
at 
org.apache.flink.client.deployment.executors.AbstractJobClusterExecutor.execute(AbstractJobClusterExecutor.java:62)
at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:1812)
at 
org.apache.flink.client.program.StreamContextEnvironment.executeAsync(StreamContextEnvironment.java:128)
at 
org.apache.flink.table.planner.delegation.ExecutorBase.executeAsync(ExecutorBase.java:57)
at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:699)
... 19 more
Caused by: java.io.NotSerializableException: 
org.apache.hadoop.conf.Configuration
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
at 
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at 
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at 
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at 
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at 
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at 
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at 
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at 
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at 
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at 
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at 
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStr

Re: FileNotFoundException when writting Hive orc tables

2020-07-21 Thread Paul Lam
Hi Rui,

I reproduced the error with a minimum case, the SQL is similar to `insert into 
hive_table_x select simple_string from kafka_table_b`. 

I’m pretty sure it’s not related to the table schema. And I removed all the 
optional properties in the Hive table DDL, the error still happened.

Best,
Paul Lam

> 2020年7月21日 16:24,Rui Li  写道:
> 
> Hey Paul,
> 
> Could you please share more about your job, e.g. the schema of your Hive 
> table, whether it's partitioned, and the table properties you've set?
> 
> On Tue, Jul 21, 2020 at 4:02 PM Paul Lam  > wrote:
> Hi,
> 
> I'm doing a POC on Hive connectors and find that when writing orc format Hive 
> tables, the job failed with FileNotFoundException right after ingesting data 
> (full stacktrace at the bottom of the mail).
> 
> The error can be steadily reproduced in my environment, which is Hadoop 
> 2.6.5(CDH-5.6.0), Hive 1.1.0(CDH-5.6.0) and Flink 1.11.0. It only happens in 
> orc tables, while other bulk formats are fine.
> 
> Does anyone have an idea about this error? Any comment and suggestions are 
> appreciated. Thanks!
> 
> Stacktrace:
> 
> Caused by: java.io.FileNotFoundException: File does not exist: 
> hdfs://xxx/warehouse2/tmp_table/.part-6b51dbc2-e169-43a8-93b2-eb8d2be45054-0-0.inprogress.d77fa76c-4760-4cb6-bb5b-97d70afff000
>  <>
>   at 
> org.apache.hadoop.hdfs.DistributedFileSystem$19.doCall(DistributedFileSystem.java:1218)
>   at 
> org.apache.hadoop.hdfs.DistributedFileSystem$19.doCall(DistributedFileSystem.java:1210)
>   at 
> org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
>   at 
> org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:1210)
>   at 
> org.apache.flink.connectors.hive.write.HiveBulkWriterFactory$1.getSize(HiveBulkWriterFactory.java:54)
>   at 
> org.apache.flink.formats.hadoop.bulk.HadoopPathBasedPartFileWriter.getSize(HadoopPathBasedPartFileWriter.java:84)
>   at 
> org.apache.flink.table.filesystem.FileSystemTableSink$TableRollingPolicy.shouldRollOnEvent(FileSystemTableSink.java:451)
>   at 
> org.apache.flink.table.filesystem.FileSystemTableSink$TableRollingPolicy.shouldRollOnEvent(FileSystemTableSink.java:421)
>   at 
> org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.write(Bucket.java:193)
>   at 
> org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.onElement(Buckets.java:282)
>   at 
> org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSinkHelper.onElement(StreamingFileSinkHelper.java:104)
>   at 
> org.apache.flink.table.filesystem.stream.StreamingFileWriter.processElement(StreamingFileWriter.java:118)
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717)
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
>   at 
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
>   at 
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
>   at StreamExecCalc$2.processElement(Unknown Source)
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717)
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
>   at 
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
>   at 
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
>   at SourceConversion$1.processElement(Unknown Source)
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717)
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
>   at 
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
>   at 
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
>   at 
> org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104)
>   at 
> org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:111)
>   at 
> org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordsWithTimestamps(AbstractFetcher.

Re: Simple MDC logs don't show up

2020-07-21 Thread Fabian Hueske
Hi,

When running your code in the IDE, everything runs in the same local JVM.
When you run the job on Kubernetes, the situation is very different.
Your code runs in multiple JVM processes distributed in a cluster.

Flink provides a metrics collection system that you should use to collect
metrics from the various processes.
Please have a look at the metrics documentation [1].

Best, Fabian

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/metrics.html


Am Mo., 20. Juli 2020 um 15:28 Uhr schrieb Manish G <
manish.c.ghildi...@gmail.com>:

> Hi All,
>
> I have some very simple MDC logs in my flink job:
>
> MDC.put("methodName", new Object() 
> {}.getClass().getEnclosingMethod().getName());
> MDC.put("className", this.getClass().getSimpleName());
>
> When I run flink job locally, I can see them in the application logs.
>
> But when I run the same job on kubernetes clutter, these don't show up.
>
> Any input here?
>
> With regards
>
>


Re: Flink rest api cancel job

2020-07-21 Thread Fabian Hueske
Hi White,

Can you describe your problem in more detail?

* What is your Flink version?
* How do you deploy the job (application / session cluster), (Kubernetes,
Docker, YARN, ...)
* What kind of job are you running (DataStream, Table/SQL, DataSet)?

Best, Fabian

Am Mo., 20. Juli 2020 um 08:42 Uhr schrieb snack white <
amazingu...@gmail.com>:

> Hi,
>   When I using rest api to cancel my job , the rest 9 TM has been
> canceled quickly , but the other one TM is always cancelling status ,
> someone can show me how can I solve the question .
> Thanks,
> White


RE: Pravega connector cannot recover from the checkpoint due to "Failure to finalize checkpoint"

2020-07-21 Thread B.Zhou
Hi Fabian,

Thanks for the reply. I also created a JIRA: 
https://issues.apache.org/jira/browse/FLINK-18641 yesterday. I think we can 
extend our discussion there.

Best Regards,
Brian

From: Fabian Hueske 
Sent: Tuesday, July 21, 2020 17:35
To: Zhou, Brian
Cc: user; Arvid Heise; Piotr Nowojski
Subject: Re: Pravega connector cannot recover from the checkpoint due to 
"Failure to finalize checkpoint"


[EXTERNAL EMAIL]
Hi Brian,

AFAIK, Arvid and Piotr (both in CC) have been working on the threading model of 
the checkpoint coordinator.
Maybe they can help with this question.

Best, Fabian

Am Mo., 20. Juli 2020 um 03:36 Uhr schrieb 
mailto:b.z...@dell.com>>:
Anyone can help us on this issue?

Best Regards,
Brian

From: Zhou, Brian
Sent: Wednesday, July 15, 2020 18:26
To: 'user@flink.apache.org'
Subject: Pravega connector cannot recover from the checkpoint due to "Failure 
to finalize checkpoint"

Hi community,
To give some background, https://github.com/pravega/flink-connectors is a 
Pravega connector for Flink. The ReaderCheckpointHook[1] class uses the Flink 
`MasterTriggerRestoreHook` interface to trigger the Pravega checkpoint during 
Flink checkpoints to make sure the data recovery. We experienced the failures 
in the latest Flink 1.11 upgrade with the checkpoint recovery, there are some 
timeout issues for the continuous checkpoint failure on some of the test cases.
Error stacktrace:
2020-07-09 15:39:39,999 30945 [jobmanager-future-thread-5] WARN  
o.a.f.runtime.jobmaster.JobMaster - Error while processing checkpoint 
acknowledgement message
org.apache.flink.runtime.checkpoint.CheckpointException: Could not finalize the 
pending checkpoint 3. Failure reason: Failure to finalize checkpoint.
 at 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.completePendingCheckpoint(CheckpointCoordinator.java:1033)
 at 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.receiveAcknowledgeMessage(CheckpointCoordinator.java:948)
 at 
org.apache.flink.runtime.scheduler.SchedulerBase.lambda$acknowledgeCheckpoint$4(SchedulerBase.java:802)
 at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
 at java.util.concurrent.FutureTask.run(FutureTask.java:266)
 at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
 at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
 at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
 at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
 at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.util.SerializedThrowable: Pending checkpoint has 
not been fully acknowledged yet
 at 
org.apache.flink.util.Preconditions.checkState(Preconditions.java:195)
 at 
org.apache.flink.runtime.checkpoint.PendingCheckpoint.finalizeCheckpoint(PendingCheckpoint.java:298)
 at 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.completePendingCheckpoint(CheckpointCoordinator.java:1021)
 ... 9 common frames omitted
After some investigation, the main problem is found. It is about the checkpoint 
recovery. When Flink CheckpointCoordinator wants to finalize a checkpoint, it 
needs to check everything is acknowledged, but for some reason, the master 
state still has our ReaderCheckpointHook remaining unack-ed, hence leading the 
checkpoint failure in the complete stage.
In the PendingCheckpoint::snapshotMasterState, there is an async call to 
acknowledge the master state for each hook. But it returned before the 
acknowledgement.
I think it might be related to the latest changes of the thread model of the 
checkpoint coordinator. Can someone help to verify?

Reproduce procedure:
Checkout this branch 
https://github.com/crazyzhou/flink-connectors/tree/brian-1.11-test and run 
below test case: FlinkPravegaReaderSavepointITCase::testPravegaWithSavepoint

[1] 
https://github.com/pravega/flink-connectors/blob/e15cfe5b7917431ce8672cf9f232cb4603d8143a/src/main/java/io/pravega/connectors/flink/ReaderCheckpointHook.java

Best Regards,
Brian



Re: Pravega connector cannot recover from the checkpoint due to "Failure to finalize checkpoint"

2020-07-21 Thread Fabian Hueske
Hi Brian,

AFAIK, Arvid and Piotr (both in CC) have been working on the threading
model of the checkpoint coordinator.
Maybe they can help with this question.

Best, Fabian

Am Mo., 20. Juli 2020 um 03:36 Uhr schrieb :

> Anyone can help us on this issue?
>
>
>
> Best Regards,
>
> Brian
>
>
>
> *From:* Zhou, Brian
> *Sent:* Wednesday, July 15, 2020 18:26
> *To:* 'user@flink.apache.org'
> *Subject:* Pravega connector cannot recover from the checkpoint due to
> "Failure to finalize checkpoint"
>
>
>
> Hi community,
>
> To give some background, https://github.com/pravega/flink-connectors is a
> Pravega connector for Flink. The ReaderCheckpointHook[1] class uses the
> Flink `MasterTriggerRestoreHook` interface to trigger the Pravega
> checkpoint during Flink checkpoints to make sure the data recovery. We
> experienced the failures in the latest Flink 1.11 upgrade with the
> checkpoint recovery, there are some timeout issues for the continuous
> checkpoint failure on some of the test cases.
> Error stacktrace:
>
> 2020-07-09 15:39:39,999 30945 [jobmanager-future-thread-5] WARN
> o.a.f.runtime.jobmaster.JobMaster - Error while processing checkpoint
> acknowledgement message
>
> org.apache.flink.runtime.checkpoint.CheckpointException: Could not
> finalize the pending checkpoint 3. Failure reason: Failure to finalize
> checkpoint.
>
>  at
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.completePendingCheckpoint(CheckpointCoordinator.java:1033)
>
>  at
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.receiveAcknowledgeMessage(CheckpointCoordinator.java:948)
>
>  at
> org.apache.flink.runtime.scheduler.SchedulerBase.lambda$acknowledgeCheckpoint$4(SchedulerBase.java:802)
>
>  at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>
>  at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>
>  at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
>
>  at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>
>  at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>
>  at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>
>  at java.lang.Thread.run(Thread.java:748)
>
> Caused by: org.apache.flink.util.SerializedThrowable: Pending checkpoint
> has not been fully acknowledged yet
>
>  at
> org.apache.flink.util.Preconditions.checkState(Preconditions.java:195)
>
>  at
> org.apache.flink.runtime.checkpoint.PendingCheckpoint.finalizeCheckpoint(PendingCheckpoint.java:298)
>
>  at
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.completePendingCheckpoint(CheckpointCoordinator.java:1021)
>
>  ... 9 common frames omitted
>
> After some investigation, the main problem is found. It is about the
> checkpoint recovery. When Flink CheckpointCoordinator wants to finalize a
> checkpoint, it needs to check everything is acknowledged, but for some
> reason, the master state still has our ReaderCheckpointHook remaining
> unack-ed, hence leading the checkpoint failure in the complete stage.
> In the PendingCheckpoint::snapshotMasterState, there is an async call to
> acknowledge the master state for each hook. But it returned before the
> acknowledgement.
> I think it might be related to the latest changes of the thread model of
> the checkpoint coordinator. Can someone help to verify?
>
>
>
> *Reproduce procedure:*
> Checkout this branch
> https://github.com/crazyzhou/flink-connectors/tree/brian-1.11-test and
> run below test case:
> FlinkPravegaReaderSavepointITCase::testPravegaWithSavepoint
>
>
>
> [1]
> https://github.com/pravega/flink-connectors/blob/e15cfe5b7917431ce8672cf9f232cb4603d8143a/src/main/java/io/pravega/connectors/flink/ReaderCheckpointHook.java
>
>
>
> Best Regards,
>
> Brian
>
>
>


Re: Custom metrics output

2020-07-21 Thread Fabian Hueske
Hi Joris,

I don't think that the approach of "add methods in operator class code that
can be called from the main Flink program" will work.

The most efficient approach would be implementing a ProcessFunction that
counts in 1-min time buckets (using event-time semantics) and updates the
metrics.
If you need the metric values to be exact, you can keep the intermediate
counts as operator state.
I would not use a KeyedProcessFunction because you didn't mention a key and
to save the overhead of the shuffle.

You can integrate the ProcessFunctions in different ways in your job.

1) just embed it into the regular flow. The ProcessFunction would just
count and forward every record it receives.
2) fork off a stream of records that just just hold the timestamp to a side
output and apply the ProcessFunction on the forked-off stream.

I think the first approach is simpler and more efficient. The
ProcessFunction would be an identity function to your actual data, just
counting and reporting metrics.

Best, Fabian

Am Mo., 20. Juli 2020 um 01:30 Uhr schrieb Joris Geer <
joris.van.der.g...@oracle.com>:

> Hi,
>
> We want to collect metrics for stream processing, typically counts
> aggregated over 1-minute buckets. However, we want these 1-minute
> boundaries determined by timestamps within the data records. Flink metrics
> do not handle this so we want to roll our own. How to proceed ? Some of our
> team members believe we can add methods in operator class code that can be
> called from the main Flink program, whist I am not sure this is supposed to
> be possible. Others consider using a side output stream with a record per
> input record and use Flink operators to do the aggregation. That may double
> the amount of records processed.
>
> Can we extend the Flink metrics to provide such aggregation ?
>
> Regards,
>
> Joris
>
>


Re: How to use a nested column for CREATE TABLE PARTITIONED BY

2020-07-21 Thread Dongwon Kim
Hi Danny,

 Which version did you use

I use Flink 1.11.0.


>  what SQL context throws the error ?

I think the declaration itself is not a problem.
The exception occurs when I tried to execute the following which I didn't
show you in the previous email:

> tEnv.sqlQuery("SELECT type, location FROM navi").executeInsert("output")


Thanks,

Dongwon

On Tue, Jul 21, 2020 at 6:16 PM Danny Chan  wrote:

> Hi, I execute the sql below
>
>   """
> |create table navi (
> |  a STRING,
> |  location ROW
> |) with (
> |  'connector' = 'filesystem',
> |  'path' = 'east-out',
> |  'format' = 'json'
> |)
> |""".stripMargin
> tableEnv.executeSql(sql0)
> val sql =
>   """
> |CREATE TABLE output (
> |  `partition` AS location.transId
> |) PARTITIONED BY (`partition`)
> |WITH (
> |  'connector' = 'filesystem',
> |  'path' = 'east-out',
> |  'format' = 'json'
> |) LIKE navi (EXCLUDING ALL)
> |""".stripMargin
> tableEnv.executeSql(sql)
>
>
> In master branch, both are correct, can you share you stack trace detail ?
> Which version did you use and what SQL context throws the error ?
>
> Best,
> Danny Chan
> 在 2020年7月21日 +0800 PM4:55,Dongwon Kim ,写道:
>
> Hi,
>
> I want to create subdirectories named after values of a nested column,
> location.transId.
>
> This is my first attempt:
>
>> CREATE TABLE output
>> PARTITIONED BY (`location.transId`)
>> WITH (
>>   'connector' = 'filesystem',
>>   'path' = 'east-out',
>>   'format' = 'json'
>> ) LIKE navi (EXCLUDING ALL)
>>
>
> It fails with the following errors:
>
>> Exception in thread "main"
>> org.apache.flink.table.api.ValidationException: Partition column
>> 'location.transId' not defined in the table schema. Available columns:
>> ['type', 'location']
>> at
>> org.apache.flink.table.planner.operations.SqlCreateTableConverter.verifyPartitioningColumnsExist(SqlCreateTableConverter.java:164)
>> at
>> org.apache.flink.table.planner.operations.SqlCreateTableConverter.createCatalogTable(SqlCreateTableConverter.java:130)
>> at
>> org.apache.flink.table.planner.operations.SqlCreateTableConverter.convertCreateTable(SqlCreateTableConverter.java:76)
>> at
>> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:190)
>> at
>> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:78)
>> at
>> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:678)
>> at com.kakaomobility.SQLExecutor.main(SQLExecutor.java:28)
>>
>
> As It seems like nested columns are not recognized as a eligible column
> for PARTITIONED BY, I tried the following:
>
>> CREATE TABLE output (
>>   `partition` AS location.transId
>> ) PARTITIONED BY (`partition`)
>> WITH (
>>   'connector' = 'filesystem',
>>   'path' = 'east-out',
>>   'format' = 'json'
>> ) LIKE navi (EXCLUDING ALL)
>>
> It also fails:
>
>>  Exception in thread "main"
>> org.apache.flink.table.api.ValidationException: The field count of logical
>> schema of the table does not match with the field count of physical schema
>
> . The logical schema: [STRING,ROW<`lastUpdateTime` BIGINT, `transId`
> STRING>]
> The physical schema: [STRING,ROW<`lastUpdateTime` BIGINT, `transId`
> STRING>,STRING].
>
> Thanks in advance,
>
> Dongwon
>
>


Re: How to use a nested column for CREATE TABLE PARTITIONED BY

2020-07-21 Thread Danny Chan
Hi, I execute the sql below

"""
   |create table navi (
   |  a STRING,
   |  location ROW
   |) with (
   |  'connector' = 'filesystem',
   |  'path' = 'east-out',
   |  'format' = 'json'
   |)
   |""".stripMargin
tableEnv.executeSql(sql0)
val sql =
"""
   |CREATE TABLE output (
   |  `partition` AS location.transId
   |) PARTITIONED BY (`partition`)
   |WITH (
   |  'connector' = 'filesystem',
   |  'path' = 'east-out',
   |  'format' = 'json'
   |) LIKE navi (EXCLUDING ALL)
   |""".stripMargin
tableEnv.executeSql(sql)

In master branch, both are correct, can you share you stack trace detail ? 
Which version did you use and what SQL context throws the error ?

Best,
Danny Chan
在 2020年7月21日 +0800 PM4:55,Dongwon Kim ,写道:
> Hi,
>
> I want to create subdirectories named after values of a nested column, 
> location.transId.
>
> This is my first attempt:
> > CREATE TABLE output
> > PARTITIONED BY (`location.transId`)
> > WITH (
> >   'connector' = 'filesystem',
> >   'path' = 'east-out',
> >   'format' = 'json'
> > ) LIKE navi (EXCLUDING ALL)
>
> It fails with the following errors:
> > Exception in thread "main" org.apache.flink.table.api.ValidationException: 
> > Partition column 'location.transId' not defined in the table schema. 
> > Available columns: ['type', 'location']
> > at 
> > org.apache.flink.table.planner.operations.SqlCreateTableConverter.verifyPartitioningColumnsExist(SqlCreateTableConverter.java:164)
> > at 
> > org.apache.flink.table.planner.operations.SqlCreateTableConverter.createCatalogTable(SqlCreateTableConverter.java:130)
> > at 
> > org.apache.flink.table.planner.operations.SqlCreateTableConverter.convertCreateTable(SqlCreateTableConverter.java:76)
> > at 
> > org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:190)
> > at 
> > org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:78)
> > at 
> > org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:678)
> > at com.kakaomobility.SQLExecutor.main(SQLExecutor.java:28)
>
> As It seems like nested columns are not recognized as a eligible column for 
> PARTITIONED BY, I tried the following:
> > CREATE TABLE output (
> >   `partition` AS location.transId
> > ) PARTITIONED BY (`partition`)
> > WITH (
> >   'connector' = 'filesystem',
> >   'path' = 'east-out',
> >   'format' = 'json'
> > ) LIKE navi (EXCLUDING ALL)
> It also fails:
> >  Exception in thread "main" org.apache.flink.table.api.ValidationException: 
> > The field count of logical schema of the table does not match with the 
> > field count of physical schema
> . The logical schema: [STRING,ROW<`lastUpdateTime` BIGINT, `transId` STRING>]
> The physical schema: [STRING,ROW<`lastUpdateTime` BIGINT, `transId` 
> STRING>,STRING].
>
> Thanks in advance,
>
> Dongwon


Re: FileNotFoundException when writting Hive orc tables

2020-07-21 Thread Jingsong Li
Hi Paul,

If your orc table has no complex(list,map,row) types, you can try to set
`table.exec.hive.fallback-mapred-writer` to false in TableConfig. And Hive
sink will use ORC native writer, it is a work-around way.

About this error, I think this is a bug for Hive 1.1 ORC. I will try to
re-produce it.

I created https://issues.apache.org/jira/browse/FLINK-18659 to track this.
If it is a bug, it should be fixed in 1.11.2

Best,
Jingsong

On Tue, Jul 21, 2020 at 4:25 PM Rui Li  wrote:

> Hey Paul,
>
> Could you please share more about your job, e.g. the schema of your Hive
> table, whether it's partitioned, and the table properties you've set?
>
> On Tue, Jul 21, 2020 at 4:02 PM Paul Lam  wrote:
>
>> Hi,
>>
>> I'm doing a POC on Hive connectors and find that when writing orc format
>> Hive tables, the job failed with FileNotFoundException right after
>> ingesting data (full stacktrace at the bottom of the mail).
>>
>> The error can be steadily reproduced in my environment, which is Hadoop
>> 2.6.5(CDH-5.6.0), Hive 1.1.0(CDH-5.6.0) and Flink 1.11.0. It only happens
>> in orc tables, while other bulk formats are fine.
>>
>> Does anyone have an idea about this error? Any comment and suggestions
>> are appreciated. Thanks!
>>
>> Stacktrace:
>>
>> Caused by: java.io.FileNotFoundException: File does not exist:
>> hdfs://xxx/warehouse2/tmp_table/.part-6b51dbc2-e169-43a8-93b2-eb8d2be45054-0-0.inprogress.d77fa76c-4760-4cb6-bb5b-97d70afff000
>> at
>> org.apache.hadoop.hdfs.DistributedFileSystem$19.doCall(DistributedFileSystem.java:1218)
>> at
>> org.apache.hadoop.hdfs.DistributedFileSystem$19.doCall(DistributedFileSystem.java:1210)
>> at
>> org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
>> at
>> org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:1210)
>> at
>> org.apache.flink.connectors.hive.write.HiveBulkWriterFactory$1.getSize(HiveBulkWriterFactory.java:54)
>> at
>> org.apache.flink.formats.hadoop.bulk.HadoopPathBasedPartFileWriter.getSize(HadoopPathBasedPartFileWriter.java:84)
>> at
>> org.apache.flink.table.filesystem.FileSystemTableSink$TableRollingPolicy.shouldRollOnEvent(FileSystemTableSink.java:451)
>> at
>> org.apache.flink.table.filesystem.FileSystemTableSink$TableRollingPolicy.shouldRollOnEvent(FileSystemTableSink.java:421)
>> at
>> org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.write(Bucket.java:193)
>> at
>> org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.onElement(Buckets.java:282)
>> at
>> org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSinkHelper.onElement(StreamingFileSinkHelper.java:104)
>> at
>> org.apache.flink.table.filesystem.stream.StreamingFileWriter.processElement(StreamingFileWriter.java:118)
>> at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717)
>> at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
>> at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
>> at
>> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
>> at
>> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
>> at StreamExecCalc$2.processElement(Unknown Source)
>> at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717)
>> at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
>> at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
>> at
>> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
>> at
>> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
>> at SourceConversion$1.processElement(Unknown Source)
>> at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717)
>> at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
>> at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
>> at
>> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
>> at
>> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
>> at
>> org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104)
>> at
>> org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:111)
>> at
>> org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordsWithTimestamps(AbstractFetcher.java:352)
>> at
>> org.apache.flink.streaming.

How to use a nested column for CREATE TABLE PARTITIONED BY

2020-07-21 Thread Dongwon Kim
Hi,

I want to create subdirectories named after values of a nested column,
location.transId.

This is my first attempt:

> CREATE TABLE output
> PARTITIONED BY (`location.transId`)
> WITH (
>   'connector' = 'filesystem',
>   'path' = 'east-out',
>   'format' = 'json'
> ) LIKE navi (EXCLUDING ALL)
>

It fails with the following errors:

> Exception in thread "main" org.apache.flink.table.api.ValidationException:
> Partition column 'location.transId' not defined in the table schema.
> Available columns: ['type', 'location']
> at
> org.apache.flink.table.planner.operations.SqlCreateTableConverter.verifyPartitioningColumnsExist(SqlCreateTableConverter.java:164)
> at
> org.apache.flink.table.planner.operations.SqlCreateTableConverter.createCatalogTable(SqlCreateTableConverter.java:130)
> at
> org.apache.flink.table.planner.operations.SqlCreateTableConverter.convertCreateTable(SqlCreateTableConverter.java:76)
> at
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:190)
> at
> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:78)
> at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:678)
> at com.kakaomobility.SQLExecutor.main(SQLExecutor.java:28)
>

As It seems like nested columns are not recognized as a eligible column for
PARTITIONED BY, I tried the following:

> CREATE TABLE output (
>   `partition` AS location.transId
> ) PARTITIONED BY (`partition`)
> WITH (
>   'connector' = 'filesystem',
>   'path' = 'east-out',
>   'format' = 'json'
> ) LIKE navi (EXCLUDING ALL)
>
It also fails:

>  Exception in thread "main"
> org.apache.flink.table.api.ValidationException: The field count of logical
> schema of the table does not match with the field count of physical schema

. The logical schema: [STRING,ROW<`lastUpdateTime` BIGINT, `transId`
STRING>]
The physical schema: [STRING,ROW<`lastUpdateTime` BIGINT, `transId`
STRING>,STRING].

Thanks in advance,

Dongwon


Re: How to get flink JobId in runtime

2020-07-21 Thread Yangze Guo
Hi Si-li,

Just a reminder that it is not the right way to get JobId because the
`StreamTask` is actually an internal class. For more discussion about
it, please refer to [1] and [2]. You could get JobId through this way
at the moment. Please keep in mind that it is not a stable contract.

[1] https://issues.apache.org/jira/browse/FLINK-17862
[2] 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/How-do-I-get-the-IP-of-the-master-and-slave-files-programmatically-in-Flink-td35299.html

Best,
Yangze Guo

On Tue, Jul 21, 2020 at 4:42 PM Si-li Liu  wrote:
>
> I figure out another way, wrapper my function in a custom StreamOperator that 
> extends AbstractUdfStreamOperator, then I can use 
> this.getContainingTask.getEnvironment.getJobId
>
> Congxian Qiu  于2020年7月21日周二 上午11:49写道:
>>
>> Hi Sili
>>
>> I'm not sure if there are other ways to get this value properly. Maybe 
>> you can try 
>> `RuntimeContext.getMetricGroup().getAllVariables().get("")`.
>>
>> Best,
>> Congxian
>>
>>
>> Si-li Liu  于2020年7月20日周一 下午7:38写道:
>>>
>>> Hi
>>>
>>> I want to retrieve flink JobId in runtime, for example, during 
>>> RichFunction's open method. Is there anyway to do it?
>>>
>>> I checked the methods in RuntimeContext and ExecutionConfig, seems I can't 
>>> get this information from them.
>>>
>>> Thanks!
>>>
>>> --
>>> Best regards
>>>
>>> Sili Liu
>
>
>
> --
> Best regards
>
> Sili Liu


Re: DynamoDB sink

2020-07-21 Thread Robert Metzger
Hi Lorenzo,

I'm not aware of any well-maintained DynamoDB Sink for Flink. I created a
JIRA ticket to track requests for it earlier this year:
https://issues.apache.org/jira/browse/FLINK-16504


On Fri, Jul 17, 2020 at 5:40 PM Lorenzo Nicora 
wrote:

> Hi
>
> I was wondering whether there is any reasonably optimised DynamoDB Sink
> I am surprised I only found some old, partial discussions about
> implementing your own one.
>
> Am I the only one with the requirement of sending output to DynamoDB?
> Am I missing something obvious?
> I am obviously looking for an idempotent, at-least-once sink.
>
> Cheers
> Lorenzo
>


Re: How to get flink JobId in runtime

2020-07-21 Thread Si-li Liu
I figure out another way, wrapper my function in a custom StreamOperator
that extends AbstractUdfStreamOperator, then I can use
this.getContainingTask.getEnvironment.getJobId

Congxian Qiu  于2020年7月21日周二 上午11:49写道:

> Hi Sili
>
> I'm not sure if there are other ways to get this value properly. Maybe
> you can try
> `RuntimeContext.getMetricGroup().getAllVariables().get("")`.
>
> Best,
> Congxian
>
>
> Si-li Liu  于2020年7月20日周一 下午7:38写道:
>
>> Hi
>>
>> I want to retrieve flink JobId in runtime, for example, during
>> RichFunction's open method. Is there anyway to do it?
>>
>> I checked the methods in RuntimeContext and ExecutionConfig, seems I
>> can't get this information from them.
>>
>> Thanks!
>>
>> --
>> Best regards
>>
>> Sili Liu
>>
>

-- 
Best regards

Sili Liu


Re: Key group is not in KeyGroupRange

2020-07-21 Thread Robert Metzger
Looks like this thread is already being resolved in
https://issues.apache.org/jira/browse/FLINK-18637

On Tue, Jul 21, 2020 at 10:26 AM Robert Metzger  wrote:

> Hi Ori,
> thanks a lot for your email. Which version of Flink are you using?
>
> On Tue, Jul 14, 2020 at 1:03 PM Ori Popowski  wrote:
>
>> I'm getting this error when creating a savepoint. I've read in
>> https://issues.apache.org/jira/browse/FLINK-16193 that it's caused by
>> unstable hashcode or equals on the key, or improper use of
>> reinterpretAsKeyedStream.
>>
>> My key is a string and I don't use reinterpretAsKeyedStream, so what's
>> going on?
>>
>> org.apache.flink.util.FlinkException: Triggering a savepoint for the job
>> 962fc8e984e7ca1ed65a038aa62ce124 failed.
>> at
>> org.apache.flink.client.cli.CliFrontend.triggerSavepoint(CliFrontend.java:633)
>> at
>> org.apache.flink.client.cli.CliFrontend.lambda$savepoint$9(CliFrontend.java:611)
>> at
>> org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:843)
>> at org.apache.flink.client.cli.CliFrontend.savepoint(CliFrontend.java:608)
>> at
>> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:910)
>> at
>> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968)
>> at java.security.AccessController.doPrivileged(Native Method)
>> at javax.security.auth.Subject.doAs(Subject.java:422)
>> at
>> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1844)
>> at
>> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
>> at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968)
>> Caused by: java.util.concurrent.CompletionException:
>> java.util.concurrent.CompletionException:
>> org.apache.flink.runtime.checkpoint.CheckpointException: The job has failed.
>> at
>> org.apache.flink.runtime.scheduler.SchedulerBase.lambda$triggerSavepoint$3(SchedulerBase.java:744)
>> at
>> java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:822)
>> at
>> java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:797)
>> at
>> java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)
>> at
>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:397)
>> at
>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:190)
>> at
>> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
>> at
>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
>> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
>> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
>> at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
>> at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
>> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
>> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
>> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
>> at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
>> at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
>> at akka.actor.ActorCell.invoke(ActorCell.scala:561)
>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
>> at akka.dispatch.Mailbox.run(Mailbox.scala:225)
>> at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
>> at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>> at
>> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>> at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>> at
>> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>> Caused by: java.util.concurrent.CompletionException:
>> org.apache.flink.runtime.checkpoint.CheckpointException: The job has failed.
>> at
>> java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)
>> at
>> java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308)
>> at
>> java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:593)
>> at
>> java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577)
>> at
>> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
>> at
>> java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
>> at
>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.lambda$null$1(CheckpointCoordinator.java:457)
>> at
>> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
>> at
>> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
>> at
>> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
>> at
>> java.util.concurrent.CompletableFuture.completeExceptionally(Co

Re: FileNotFoundException when writting Hive orc tables

2020-07-21 Thread Rui Li
Hey Paul,

Could you please share more about your job, e.g. the schema of your Hive
table, whether it's partitioned, and the table properties you've set?

On Tue, Jul 21, 2020 at 4:02 PM Paul Lam  wrote:

> Hi,
>
> I'm doing a POC on Hive connectors and find that when writing orc format
> Hive tables, the job failed with FileNotFoundException right after
> ingesting data (full stacktrace at the bottom of the mail).
>
> The error can be steadily reproduced in my environment, which is Hadoop
> 2.6.5(CDH-5.6.0), Hive 1.1.0(CDH-5.6.0) and Flink 1.11.0. It only happens
> in orc tables, while other bulk formats are fine.
>
> Does anyone have an idea about this error? Any comment and suggestions are
> appreciated. Thanks!
>
> Stacktrace:
>
> Caused by: java.io.FileNotFoundException: File does not exist:
> hdfs://xxx/warehouse2/tmp_table/.part-6b51dbc2-e169-43a8-93b2-eb8d2be45054-0-0.inprogress.d77fa76c-4760-4cb6-bb5b-97d70afff000
> at
> org.apache.hadoop.hdfs.DistributedFileSystem$19.doCall(DistributedFileSystem.java:1218)
> at
> org.apache.hadoop.hdfs.DistributedFileSystem$19.doCall(DistributedFileSystem.java:1210)
> at
> org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
> at
> org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:1210)
> at
> org.apache.flink.connectors.hive.write.HiveBulkWriterFactory$1.getSize(HiveBulkWriterFactory.java:54)
> at
> org.apache.flink.formats.hadoop.bulk.HadoopPathBasedPartFileWriter.getSize(HadoopPathBasedPartFileWriter.java:84)
> at
> org.apache.flink.table.filesystem.FileSystemTableSink$TableRollingPolicy.shouldRollOnEvent(FileSystemTableSink.java:451)
> at
> org.apache.flink.table.filesystem.FileSystemTableSink$TableRollingPolicy.shouldRollOnEvent(FileSystemTableSink.java:421)
> at
> org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.write(Bucket.java:193)
> at
> org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.onElement(Buckets.java:282)
> at
> org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSinkHelper.onElement(StreamingFileSinkHelper.java:104)
> at
> org.apache.flink.table.filesystem.stream.StreamingFileWriter.processElement(StreamingFileWriter.java:118)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
> at
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
> at
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
> at StreamExecCalc$2.processElement(Unknown Source)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
> at
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
> at
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
> at SourceConversion$1.processElement(Unknown Source)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
> at
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
> at
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
> at
> org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104)
> at
> org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:111)
> at
> org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordsWithTimestamps(AbstractFetcher.java:352)
> at
> org.apache.flink.streaming.connectors.kafka.internal.Kafka010Fetcher.runFetchLoop(Kafka010Fetcher.java:151)
> at
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:755)
> at
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
> at
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
> at
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:201)
>
>
> Best,
> Paul Lam
>
>

-- 
Best regards!
Rui Li


Re: Map type param escaping :

2020-07-21 Thread Robert Metzger
Cool! Thanks for sharing the solution!

On Tue, Jul 14, 2020 at 11:39 PM Bohinski, Kevin 
wrote:

> Figured it out, pulled StructuredOptionsSplitter into a debugger and was
> able to get it working with:
>
> -Dkubernetes.jobmanager.annotations="\"KEY:\"\"V:A:L:U:E\"\"\""
>
>
>
> Best
>
> kevin
>


Re: Possible bug clean up savepoint dump into filesystem and low network IO starting from savepoint

2020-07-21 Thread David Magalhães
Hi Till, I'm using s3:// schema, but not sure what was the default used if
s3a or s3p.

then the state backend should try to directly write to the target file
> system


That was the behaviour that I saw the second time I've run this with more
slots. Does the savepoint write directly to S3 via streaming or write the
savepoint to memory first before sending to S3?

Thanks,
David

On Tue, Jul 21, 2020 at 7:42 AM Till Rohrmann  wrote:

> Hi David,
>
> which S3 file system implementation are you using? If I'm not mistaken,
> then the state backend should try to directly write to the target file
> system. If this should result in temporary files on your TM, then this
> might be a problem of the file system implementation. Having access to the
> logs could also help to better understand whats going on.
>
> Cheers,
> Till
>
> On Tue, Jul 14, 2020 at 11:57 AM David Magalhães 
> wrote:
>
>> Hi Congxian, sorry for the late reply.
>>
>> I'm using the filesystem with an S3 path as the default state backend in
>> flink-conf.yml (state.backend: filesystem).
>> The Flink version I'm using is 1.10.1.
>>
>> By "The task manager did not clean up the state", I mean what the
>> taskmanager was writing on disk the savepoint file, but it didn't delete it
>> after the other taskmanager had an issue with the disk being full. The
>> expected scenario would be both taskmanagers remove the savepoint they were
>> trying to do from the disk, but only the one that reached 100% disk space
>> use did it.
>>
>> For my scenario, I'm using the Flink REST API to start/deploy jobs. A
>> retained checkpoint isn't supported in REST API and even if it was, I think
>> it doesn't fit my scenario (stop a job, and start the new one from the
>> saved state).
>>
>> Thanks,
>> David
>>
>> On Sat, Jul 11, 2020 at 8:14 AM Congxian Qiu 
>> wrote:
>>
>>> Hi David
>>>
>>> As you say the savepoint use local disk, I assume that you use
>>> RocksDBStateBackend.
>>> What's the flink version are you using now?
>>>
>>> What do you mean "The task manager did not clean up the state"?, does
>>> that mean the local disk space did not  clean up, do the task encounter
>>> failover in this period?
>>>
>>> The snapshot speed will be limited by the network bandwidth and the
>>> local io performance.
>>> IIUC, currently only checkpoint support local recovery
>>>
>>> PS: If you want the snapshot complete quickly, maybe you can try
>>> retained checkpoint[1], and multiple threads uploads[2]
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/state/checkpoints.html#retained-checkpoints
>>> [2] https://issues.apache.org/jira/browse/FLINK-11008
>>>
>>> Best,
>>> Congxian
>>>
>>>
>>> David Magalhães  于2020年7月10日周五 下午7:37写道:
>>>
 Hi, yesterday when I was creating a savepoint (to S3, around 8GB of
 state) using 2 TaskManager (8 GB) and it failed because one of the task
 managers fill up the disk (probably didn't have enough RAM to save the
 state into S3 directly,I don't know what was the disk space, and reached
 100% usage space and the other one reached 99%).

 After the crash, the task manager that reach 100% deleted the "failed
 savepoint" from the local disk but the other one that reached 99% kept it.
 Shouldn't this task manager also clean up the failed state?

 After cleaning up the disk of that task manager, I've increased the
 parallelism to 6, created a new state of 8GB and all went smoothly, but it
 took 8 minutes to start processing in the new job created with the previous
 savepoint.

 [image: flink_grafana.png]
 Here is the network IO from the 6 task managers used and I have a few
 questions:

 - Isn't 25 Mbps of average speed a bit low? What could be the
 limitation?
 - For 8 GB of state, gives around 7 minutes to download it [ 8000 MB
 /(25Mbps/8*6 task managers)/60 seconds ], that should match the consistent
 part of 7/8 minute graph, and then started reading from Kafka topic.
 - Can I mitigate this with task local recovery [1]? Or is this only for
 a checkpoint ?
 - We are using *m5.xlarge* (4 vcpu, 16GB RAM) with 2 slots per TM.

 Thanks,
 David

 [1]
 https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/large_state_tuning.html#task-local-recovery

>>>


Re: Saving file to the ftp server

2020-07-21 Thread Robert Metzger
Hi Paweł,
I believe this is a bug. I don't think many people use Flink to write to an
FTP server, that's why this hasn't been addressed yet.
There's probably something off with the semantics of distributed vs
non-distributed file systems. I guess the easiest way to resolve this is by
running your Flink job out of an IDE, and attaching a debugger to the
FileOutputFormat.initializeGlobal() method to check what goes wrong.

Best,
Robert

On Fri, Jul 10, 2020 at 7:00 PM Paweł Goliszewski 
wrote:

> Hi to all,
>
>
>
> I tried to send a file from local storage to ftp server in docker
> container (image: stilliard/pure-ftpd) using Flink 1.10 with hadoop 2.8.5.
> I tried to do so with the following code:
>
> final ExecutionEnvironment env =
> ExecutionEnvironment.getExecutionEnvironment();
>
> DataSource> lines = env.readCsvFile("
> file:///path/to/myfile.csv").types(String.class);
>
> lines.writeAsCsv("ftp://user:pass@localhost:21/test";);
>
>
>
> env.execute();
>
> After running it I get an exception with the following stacktrace:
>
> Exception in thread "main" java.lang.RuntimeException:
> java.util.concurrent.ExecutionException:
> org.apache.flink.runtime.client.JobSubmissionException: Failed to submit
> job.
>
> at
> org.apache.flink.util.ExceptionUtils.rethrow(ExceptionUtils.java:199)
>
> at
> org.apache.flink.api.java.ExecutionEnvironment.executeAsync(ExecutionEnvironment.java:952)
>
> at
> org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:860)
>
> at
> org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:844)
>
> at org.example.BatchJob.main(BatchJob.java:44)
>
> Caused by: java.util.concurrent.ExecutionException:
> org.apache.flink.runtime.client.JobSubmissionException: Failed to submit
> job.
>
> at
> java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
>
> at
> java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
>
> at
> org.apache.flink.api.java.ExecutionEnvironment.executeAsync(ExecutionEnvironment.java:947)
>
> ... 3 more
>
> Caused by: org.apache.flink.runtime.client.JobSubmissionException: Failed
> to submit job.
>
> at
> org.apache.flink.runtime.dispatcher.Dispatcher.lambda$internalSubmitJob$3(Dispatcher.java:336)
>
> at
> java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:822)
>
> at
> java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:797)
>
> at
> java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)
>
> at
> akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
>
> at
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44)
>
> at
> akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>
> at
> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>
> at
> akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>
> at
> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>
> Caused by: java.lang.RuntimeException:
> org.apache.flink.runtime.client.JobExecutionException: Could not set up
> JobManager
>
> at
> org.apache.flink.util.function.CheckedSupplier.lambda$unchecked$0(CheckedSupplier.java:36)
>
> at
> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
>
> ... 6 more
>
> Caused by: org.apache.flink.runtime.client.JobExecutionException: Could
> not set up JobManager
>
> at
> org.apache.flink.runtime.jobmaster.JobManagerRunnerImpl.(JobManagerRunnerImpl.java:152)
>
> at
> org.apache.flink.runtime.dispatcher.DefaultJobManagerRunnerFactory.createJobManagerRunner(DefaultJobManagerRunnerFactory.java:84)
>
> at
> org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$6(Dispatcher.java:379)
>
> at
> org.apache.flink.util.function.CheckedSupplier.lambda$unchecked$0(CheckedSupplier.java:34)
>
> ... 7 more
>
> Caused by: org.apache.flink.runtime.client.JobExecutionException: Cannot
> initialize task 'DataSink (CsvOutputFormat (path:
> ftp://goli:kotlet@localhost:21/test, delimiter: ,))': Output directory
> could not be created.
>
> at
> org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:216)
>
> at
> org.apache.flink.runtime.scheduler.SchedulerBase.createExecutionGraph(SchedulerBase.java:255)
>
> at
> org.apache.flink.runtime.scheduler.SchedulerBase.createAndRestoreExecutionGraph(Scheduler

FileNotFoundException when writting Hive orc tables

2020-07-21 Thread Paul Lam
Hi,

I'm doing a POC on Hive connectors and find that when writing orc format Hive 
tables, the job failed with FileNotFoundException right after ingesting data 
(full stacktrace at the bottom of the mail).

The error can be steadily reproduced in my environment, which is Hadoop 
2.6.5(CDH-5.6.0), Hive 1.1.0(CDH-5.6.0) and Flink 1.11.0. It only happens in 
orc tables, while other bulk formats are fine.

Does anyone have an idea about this error? Any comment and suggestions are 
appreciated. Thanks!

Stacktrace:

Caused by: java.io.FileNotFoundException: File does not exist: 
hdfs://xxx/warehouse2/tmp_table/.part-6b51dbc2-e169-43a8-93b2-eb8d2be45054-0-0.inprogress.d77fa76c-4760-4cb6-bb5b-97d70afff000
at 
org.apache.hadoop.hdfs.DistributedFileSystem$19.doCall(DistributedFileSystem.java:1218)
at 
org.apache.hadoop.hdfs.DistributedFileSystem$19.doCall(DistributedFileSystem.java:1210)
at 
org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
at 
org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:1210)
at 
org.apache.flink.connectors.hive.write.HiveBulkWriterFactory$1.getSize(HiveBulkWriterFactory.java:54)
at 
org.apache.flink.formats.hadoop.bulk.HadoopPathBasedPartFileWriter.getSize(HadoopPathBasedPartFileWriter.java:84)
at 
org.apache.flink.table.filesystem.FileSystemTableSink$TableRollingPolicy.shouldRollOnEvent(FileSystemTableSink.java:451)
at 
org.apache.flink.table.filesystem.FileSystemTableSink$TableRollingPolicy.shouldRollOnEvent(FileSystemTableSink.java:421)
at 
org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.write(Bucket.java:193)
at 
org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.onElement(Buckets.java:282)
at 
org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSinkHelper.onElement(StreamingFileSinkHelper.java:104)
at 
org.apache.flink.table.filesystem.stream.StreamingFileWriter.processElement(StreamingFileWriter.java:118)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
at StreamExecCalc$2.processElement(Unknown Source)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
at SourceConversion$1.processElement(Unknown Source)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
at 
org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104)
at 
org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:111)
at 
org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordsWithTimestamps(AbstractFetcher.java:352)
at 
org.apache.flink.streaming.connectors.kafka.internal.Kafka010Fetcher.runFetchLoop(Kafka010Fetcher.java:151)
at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:755)
at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
at 
org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:201)


Best,
Paul Lam



Re: Kafka Rate Limit in FlinkConsumer ?

2020-07-21 Thread Till Rohrmann
Two quick comments: With unaligned checkpoints which are released with
Flink 1.11.0, the problem of slow checkpoints under backpressure has been
resolved/mitigated to a good extent. Moreover, the community wants to work
on event time alignment for sources in the next release. This should
prevent that different sources diverge too much wrt event time.

Cheers,
Till

On Tue, Jul 7, 2020 at 2:48 AM David Magalhães 
wrote:

> Thanks for the reply Chen.
>
> My use case is a "simple" get from Kafka into S3. The job can read very
> quickly from Kafka and S3 is having some issues keeping up. The
> backpressure don't have enough time to actuate in this case, and when it
> reaches the checkpoint time some errors like heartbeat timeout or task
> manager didn't reply back starts to happen.
>
> I will investigate further and try this example.
>
> On Mon, Jul 6, 2020 at 5:45 PM Chen Qin  wrote:
>
>> My two cents here,
>>
>> - flink job already has back pressure so rate limit can be done via
>> setting parallelism to proper number in some use cases. There is an open
>> issue of checkpointing reliability when back pressure, community seems
>> working on it.
>>
>> - rate limit can be abused easily and cause lot of confusions. Think
>> about a use case where you have two streams do a simple interval join.
>> Unless you were able to rate limit both with proper value dynamiclly, you
>> might see timestamp and watermark gaps keep increasing causing
>> checkpointing failure.
>>
>> So the question might be, instead of looking at rate limit of one source,
>> how to slow down all sources without ever increasing time, wm gaps. It
>> sounds complicated already.
>>
>> with what being said, if you really want to have rate limit on your own,
>> you can try following code :) It works well for us.
>>
>> public class SynchronousKafkaConsumer extends FlinkKafkaConsumer {
>>
>>   protected static final Logger LOG = 
>> LoggerFactory.getLogger(SynchronousKafkaConsumer.class);
>>
>>   private final double topicRateLimit;
>>   private transient RateLimiter subtaskRateLimiter;
>>
>>
>> @Override
>> public void open(Configuration configuration) throws Exception {
>>   Preconditions.checkArgument(
>>   topicRateLimit / getRuntimeContext().getNumberOfParallelSubtasks() > 
>> 0.1,
>>   "subtask ratelimit should be greater than 0.1 QPS");
>>   subtaskRateLimiter = RateLimiter.create(
>>   topicRateLimit / getRuntimeContext().getNumberOfParallelSubtasks());
>>   super.open(configuration);
>> }
>>
>>
>> @Override
>> protected AbstractFetcher createFetcher(
>> SourceContext sourceContext,
>> Map partitionsWithOffsets,
>> SerializedValue> watermarksPeriodic,
>> SerializedValue> 
>> watermarksPunctuated,
>> StreamingRuntimeContext runtimeContext,
>> OffsetCommitMode offsetCommitMode,
>> MetricGroup consumerMetricGroup, boolean useMetrics)
>> throws Exception {
>>
>>   return new KafkaFetcher(
>>   sourceContext,
>>   partitionsWithOffsets,
>>   watermarksPeriodic,
>>   watermarksPunctuated,
>>   runtimeContext.getProcessingTimeService(),
>>   runtimeContext.getExecutionConfig().getAutoWatermarkInterval(),
>>   runtimeContext.getUserCodeClassLoader(),
>>   runtimeContext.getTaskNameWithSubtasks(),
>>   deserializer,
>>   properties,
>>   pollTimeout,
>>   runtimeContext.getMetricGroup(),
>>   consumerMetricGroup,
>>   useMetrics) {
>> @Override
>> protected void emitRecord(T record,
>>   KafkaTopicPartitionState 
>> partitionState,
>>   long offset) throws Exception {
>>   subtaskRateLimiter.acquire();
>>   if (record == null) {
>> consumerMetricGroup.counter("invalidRecord").inc();
>>   }
>>   super.emitRecord(record, partitionState, offset);
>> }
>>
>> @Override
>> protected void emitRecordWithTimestamp(T record,
>> 
>> KafkaTopicPartitionState partitionState,
>>long offset, long timestamp) 
>> throws Exception {
>>   subtaskRateLimiter.acquire();
>>   if (record == null) {
>> consumerMetricGroup.counter("invalidRecord").inc();
>>   }
>>   super.emitRecordWithTimestamp(record, partitionState, offset, 
>> timestamp);
>> }
>>   };
>>
>> }
>>
>> Thanks,
>>
>> Chen
>> Pinterest Data
>>
>>
>> On Jul 6, 2020, at 7:43 AM, David Magalhães 
>> wrote:
>>
>> I've noticed that this FLINK-11501 was implemented in
>> flink-connector-kafka-0.10 [1], but it wasn't in the current version of the
>> flink-connector-kafka. There is any reason for this, and why should be the
>> best solution to implement a rate limit functionality in the current Kafka
>> consumer?
>>
>> Thanks,
>> David
>>
>> [1]
>> https://github.com/lyft/flink/blob/release-1.11-lyft/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsum