flink run jar throw java.lang.ClassNotFoundException: com.mysql.jdbc.Driver

2019-10-29 Thread Alex Wang
Hello everyone, I am a newbie.
I am learning the flink-sql-submit project. From @Jark Wu  :
https://github.com/wuchong/flink-sql-submit

My local environment is:
1. flink1.9.0 standalone
2. kafka_2.11-2.2.0 single

I configured Flink Connectors and Formats jars to $FLINK_HOME/lib .
Reference:
https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/connect.html#connectors

Then I run flink-sql-submit , sh run.sh q1
Throw  java.lang.ClassNotFoundException: com.mysql.jdbc.Driver.

My question is:
I configured mysql-connector-java in the pom.xml file, mvn build jar
include com.mysql.jdbc.Driver.
Why is this error still reported? I put the jar package in $FLINK_HOME/lib
and the problem can be solved.
Do you need to put these jars in $FLINK_HOME/lib when the project relies on
too many jar packages?
If I don't put mysql-connector-java.jar in $FLINK_HOME/lib, how can I solve
this problem?

Can @Jark Wu  give me some advice? Or can someone give me some advice?
Thank you.

1. pom.xml


> mysql
> mysql-connector-java
> 5.1.38
> 

2. mvn clean; mvn package

$ ll -rth target
>
>  [±master ●]
> total 32312
> drwxr-xr-x  3 alex  staff96B Oct 30 11:32 generated-sources
> drwxr-xr-x  5 alex  staff   160B Oct 30 11:32 classes
> drwxr-xr-x  3 alex  staff96B Oct 30 11:32 maven-archiver
> -rw-r--r--  1 alex  staff   7.2M Oct 30 11:32
> flink-sql-submit-1.0-SNAPSHOT.jar
> -rw-r--r--  1 alex  staff   8.2M Oct 30 11:32 flink-sql-submit.jar
>

3. flink-sql-submit.jar include java.sql.Driver

" zip.vim version v28
> " Browsing zipfile
> /Users/alex/IdeaProjects/alex/flink_learn/flink-sql-submit/target/flink-sql-submit.jar
> " Select a file with cursor and press ENTER
>
> META-INF/MANIFEST.MF
> META-INF/
> q1.sql
> user_behavior.log
> com/
> com/github/
> com/github/wuchong/
> com/github/wuchong/sqlsubmit/
> com/github/wuchong/sqlsubmit/SqlSubmit$1.class
> com/github/wuchong/sqlsubmit/SqlSubmit.class
> com/github/wuchong/sqlsubmit/SourceGenerator.class
> com/github/wuchong/sqlsubmit/cli/
> com/github/wuchong/sqlsubmit/cli/SqlCommandParser$SqlCommandCall.class
> com/github/wuchong/sqlsubmit/cli/SqlCommandParser.class
> com/github/wuchong/sqlsubmit/cli/SqlCommandParser$SqlCommand.class
> com/github/wuchong/sqlsubmit/cli/CliOptions.class
> com/github/wuchong/sqlsubmit/cli/CliOptionsParser.class
> META-INF/maven/
> META-INF/maven/com.github.wuchong/
> META-INF/maven/com.github.wuchong/flink-sql-submit/
> META-INF/maven/com.github.wuchong/flink-sql-submit/pom.xml
> META-INF/maven/com.github.wuchong/flink-sql-submit/pom.properties
> META-INF/services/
> META-INF/services/java.sql.Driver
> com/mysql/
> com/mysql/fabric/
> com/mysql/fabric/FabricCommunicationException.class
> com/mysql/fabric/FabricConnection.class
> com/mysql/fabric/FabricStateResponse.class
> com/mysql/fabric/HashShardMapping$ReverseShardIndexSorter.class
> com/mysql/fabric/HashShardMapping.class
> com/mysql/fabric/RangeShardMapping$RangeShardIndexSorter.class
> com/mysql/fabric/RangeShardMapping.class
> com/mysql/fabric/Response.class
> com/mysql/fabric/Server.class
> com/mysql/fabric/ServerGroup.class
> com/mysql/fabric/ServerMode.class
> com/mysql/fabric/ServerRole.class
> etc ...
>



$FLINK_DIR/bin/flink run -d -p 3 target/flink-sql-submit.jar -w
"${PROJECT_DIR}"/src/main/resources/ -f "$1".sql
Eerror:
2019-10-30 10:27:35
java.lang.IllegalArgumentException: JDBC driver class not found.
At
org.apache.flink.api.java.io.jdbc.JDBCUpsertOutputFormat.open(JDBCUpsertOutputFormat.java:112)
At
org.apache.flink.api.java.io.jdbc.JDBCUpsertSinkFunction.open(JDBCUpsertSinkFunction.java:42)
At
org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
At
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
At
org.apache.flink.streaming.api.operators.StreamSink.open(StreamSink.java:48)
At
org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:532)
At
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:396)
At org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
At org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
At java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassNotFoundException: com.mysql.jdbc.Driver
At java.net.URLClassLoader.findClass(URLClassLoader.java:381)
At java.lang.ClassLoader.loadClass(ClassLoader.java:424)
At sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349)
At java.lang.ClassLoader.loadClass(ClassLoader.java:357)
At java.lang.Class.forName0(Native Method)
At java.lang.Class.forName(Class.java:264)
At
org.apache.flink.api.java.io.jdbc.AbstractJDBCOutputFormat.establishConnection(AbstractJDBCOutputFormat.java:66)
At
org.apache.flink.api.java.io.jdbc.JDBCUpsertOutputFormat.open(JDBCUpsertOutputFormat.java:99)
... 9 more


-- 
Best


flink run jar throw java.lang.ClassNotFoundException: com.mysql.jdbc.Driver

2019-10-29 Thread Alex Wang
Hello everyone, I am a newbie.
I am learning the flink-sql-submit project. From @Jark Wu  :
https://github.com/wuchong/flink-sql-submit

My local environment is:
1. flink1.9.0 standalone
2. kafka_2.11-2.2.0 single

I configured Flink Connectors and Formats jars to $FLINK_HOME/lib .
Reference:
https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/connect.html#connectors

Then I run flink-sql-submit , sh run.sh q1
Throw  java.lang.ClassNotFoundException: com.mysql.jdbc.Driver.

My question is:
I configured mysql-connector-java in the pom.xml file, mvn build jar
include com.mysql.jdbc.Driver.
Why is this error still reported? I put the jar package in $FLINK_HOME/lib
and the problem can be solved.
Do you need to put these jars in $FLINK_HOME/lib when the project relies on
too many jar packages?
If I don't put mysql-connector-java.jar in $FLINK_HOME/lib, how can I solve
this problem?

Can @Jark Wu  give me some advice? Or can someone give me some advice?
Thank you.

1. pom.xml


> mysql
> mysql-connector-java
> 5.1.38
> 

2. mvn clean; mvn package

$ ll -rth target
>
>  [±master ●]
> total 32312
> drwxr-xr-x  3 alex  staff96B Oct 30 11:32 generated-sources
> drwxr-xr-x  5 alex  staff   160B Oct 30 11:32 classes
> drwxr-xr-x  3 alex  staff96B Oct 30 11:32 maven-archiver
> -rw-r--r--  1 alex  staff   7.2M Oct 30 11:32
> flink-sql-submit-1.0-SNAPSHOT.jar
> -rw-r--r--  1 alex  staff   8.2M Oct 30 11:32 flink-sql-submit.jar
>

3. flink-sql-submit.jar include java.sql.Driver

" zip.vim version v28
> " Browsing zipfile
> /Users/alex/IdeaProjects/alex/flink_learn/flink-sql-submit/target/flink-sql-submit.jar
> " Select a file with cursor and press ENTER
>
> META-INF/MANIFEST.MF
> META-INF/
> q1.sql
> user_behavior.log
> com/
> com/github/
> com/github/wuchong/
> com/github/wuchong/sqlsubmit/
> com/github/wuchong/sqlsubmit/SqlSubmit$1.class
> com/github/wuchong/sqlsubmit/SqlSubmit.class
> com/github/wuchong/sqlsubmit/SourceGenerator.class
> com/github/wuchong/sqlsubmit/cli/
> com/github/wuchong/sqlsubmit/cli/SqlCommandParser$SqlCommandCall.class
> com/github/wuchong/sqlsubmit/cli/SqlCommandParser.class
> com/github/wuchong/sqlsubmit/cli/SqlCommandParser$SqlCommand.class
> com/github/wuchong/sqlsubmit/cli/CliOptions.class
> com/github/wuchong/sqlsubmit/cli/CliOptionsParser.class
> META-INF/maven/
> META-INF/maven/com.github.wuchong/
> META-INF/maven/com.github.wuchong/flink-sql-submit/
> META-INF/maven/com.github.wuchong/flink-sql-submit/pom.xml
> META-INF/maven/com.github.wuchong/flink-sql-submit/pom.properties
> META-INF/services/
> META-INF/services/java.sql.Driver
> com/mysql/
> com/mysql/fabric/
> com/mysql/fabric/FabricCommunicationException.class
> com/mysql/fabric/FabricConnection.class
> com/mysql/fabric/FabricStateResponse.class
> com/mysql/fabric/HashShardMapping$ReverseShardIndexSorter.class
> com/mysql/fabric/HashShardMapping.class
> com/mysql/fabric/RangeShardMapping$RangeShardIndexSorter.class
> com/mysql/fabric/RangeShardMapping.class
> com/mysql/fabric/Response.class
> com/mysql/fabric/Server.class
> com/mysql/fabric/ServerGroup.class
> com/mysql/fabric/ServerMode.class
> com/mysql/fabric/ServerRole.class
> etc ...
>



$FLINK_DIR/bin/flink run -d -p 3 target/flink-sql-submit.jar -w
"${PROJECT_DIR}"/src/main/resources/ -f "$1".sql
Eerror:
2019-10-30 10:27:35
java.lang.IllegalArgumentException: JDBC driver class not found.
At
org.apache.flink.api.java.io.jdbc.JDBCUpsertOutputFormat.open(JDBCUpsertOutputFormat.java:112)
At
org.apache.flink.api.java.io.jdbc.JDBCUpsertSinkFunction.open(JDBCUpsertSinkFunction.java:42)
At
org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
At
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
At
org.apache.flink.streaming.api.operators.StreamSink.open(StreamSink.java:48)
At
org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:532)
At
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:396)
At org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
At org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
At java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassNotFoundException: com.mysql.jdbc.Driver
At java.net.URLClassLoader.findClass(URLClassLoader.java:381)
At java.lang.ClassLoader.loadClass(ClassLoader.java:424)
At sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349)
At java.lang.ClassLoader.loadClass(ClassLoader.java:357)
At java.lang.Class.forName0(Native Method)
At java.lang.Class.forName(Class.java:264)
At
org.apache.flink.api.java.io.jdbc.AbstractJDBCOutputFormat.establishConnection(AbstractJDBCOutputFormat.java:66)
At
org.apache.flink.api.java.io.jdbc.JDBCUpsertOutputFormat.open(JDBCUpsertOutputFormat.java:99)
... 9 more


-- 
Best


Re: [FlinkSQL] is there a way to read/write local json data in Flink SQL like that of kafka?

2019-10-29 Thread Jingsong Li
Hi anyang:

For you information. I plan to support JSON format in file system connector
after https://issues.apache.org/jira/browse/FLINK-14256
After FLIP-66[1], we can define time attribute in SQL DDL whatever
connector is.

[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-66%3A+Support+Time+Attribute+in+SQL+DDL

On Wed, Oct 30, 2019 at 11:36 AM Jingsong Li  wrote:

> Hi anyang:
>
> For you information. I plan to support JSON format in file system
> connector after https://issues.apache.org/jira/browse/FLINK-14256
> After FLIP-66[1], we can define time attribute in SQL DDL whatever
> connector is.
>
> [1]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-66%3A+Support+Time+Attribute+in+SQL+DDL
>
>
> On Tue, Oct 29, 2019 at 10:01 PM Anyang Hu  wrote:
>
>> Hi,
>>
>> Thanks Dawid and Florin.
>>
>> To Dawid:
>>
>> CsvTableSource doesn't implements DefinedProctimeAttribute and 
>> DefinedRowtimeAttributes interfaces, so we can not use proctime and rowtime 
>> in source ddl. Except csv, we also need to consume json and pb data.
>>
>>
>> To Florin:
>> Installing local kafka and zk introduces too many third-party
>> components and may be not universal.
>>
>> In my scenario, I need to run a local sql job to debug(for example source
>> and sink are kafka-json, dimension table is jdbc) before submit  it to
>> yarn. The following usage is what I want:
>> 1)generate local json data for source and dimension table (source table
>> supports proctime and rowtime);
>> 2)  replace `connetor.type` to 'filesystem';
>> 3)  add `connector.path`  to source table /dimension table ddl property;
>> 4)  new sql can run locally as data read from kafka and jdbc.
>>
>> Thanks,
>> Anyang
>>
>> Spico Florin  于2019年10月29日周二 下午6:35写道:
>>
>>> Hi!
>>>
>>>  Another solution would be to locally install kafka+zookeeper and push
>>> your dumped json (from the production server) data in a topic(you create a
>>> Kafka producer).
>>>  Then you configure your code to point to this local broker. Consume
>>> your data from topic from either strategy you need (earliest offset,
>>> latest).
>>> The advantage is that you can repeat your tests multiple times as in
>>> real scenario.
>>>
>>> Depending on your use case, there can be different behaviour of your
>>> processing pipeline when you consume from a file (batch) or from a stream
>>> (kafka).
>>> I had this kind of issue when some CEP functionalities.
>>> I hope it helps.
>>>  Regards,
>>>  Florin
>>>
>>>
>>> On Tue, Oct 29, 2019 at 12:00 PM Anyang Hu 
>>> wrote:
>>>
 Hi guys,

 In flink1.9, we can set `connector.type` to `kafka` and `format.type`
 to json to read/write json data from kafka or write json data to kafka.

 In my scenario, I wish to read local json data as a souce table, since
 I need to do local debug and don't consume online kafka data.

 For example:

> create table source (
> first varchar,
> id int
> ) with (
> 'connector.type' = 'filesystem',
> 'connector.path' = '/path/to/json',
> 'format.type' = 'json'
> )


 In addition, writing local json data is also needed.

 Does anyone have similar needs?

 Best regards,
 Anyang

>>>
>
> --
> Best, Jingsong Lee
>


-- 
Best, Jingsong Lee


Re: The RMClient's and YarnResourceManagers internal state about the number of pending container requests has diverged

2019-10-29 Thread Yang Wang
Hi Chan,

If it is a bug, i think it is critical. Could you share the job manager
logs with me too? I have some time to
analyze and hope to find the root cause.


Best,
Yang

Chan, Regina  于2019年10月30日周三 上午10:55写道:

> Till, were you able find anything? Do you need more logs?
>
>
>
>
>
> *From:* Till Rohrmann 
> *Sent:* Saturday, October 26, 2019 1:17 PM
> *To:* Chan, Regina [Engineering] 
> *Cc:* Yang Wang ; user 
> *Subject:* Re: The RMClient's and YarnResourceManagers internal state
> about the number of pending container requests has diverged
>
>
>
> Forget my last email. I received the on time code and could access the
> logs.
>
>
>
> Cheers,
>
> Till
>
>
>
> On Sat, Oct 26, 2019 at 6:49 PM Till Rohrmann 
> wrote:
>
> Hi Regina,
>
>
>
> I couldn't access the log files because LockBox asked to create a new
> password and now it asks me for the one time code to confirm this change.
> It says that it will send the one time code to my registered email which I
> don't have.
>
>
>
> Cheers,
>
> Till
>
>
>
> On Fri, Oct 25, 2019 at 10:14 PM Till Rohrmann 
> wrote:
>
> Great, thanks a lot Regina. I'll check the logs tomorrow. If info level is
> not enough, then I'll let you know.
>
>
>
> Cheers,
>
> Till
>
>
>
> On Fri, Oct 25, 2019, 21:20 Chan, Regina  wrote:
>
> Till, I added you to this lockbox area where you should be able to
> download the logs. You should have also received an email with an account
> created in lockbox where you can set a password. Let me know if you have
> any issues.
>
>
>
>
>
>
>
> *From:* Till Rohrmann 
> *Sent:* Friday, October 25, 2019 1:24 PM
> *To:* Chan, Regina [Engineering] 
> *Cc:* Yang Wang ; user 
> *Subject:* Re: The RMClient's and YarnResourceManagers internal state
> about the number of pending container requests has diverged
>
>
>
> Could you provide me with the full logs of the cluster
> entrypoint/JobManager. I'd like to see what's going on there.
>
>
>
> Cheers,
>
> Till
>
>
>
> On Fri, Oct 25, 2019, 19:10 Chan, Regina  wrote:
>
> Till,
>
>
>
> We’re still seeing a large number of returned containers even with this
> heart beat set to something higher. Do you have hints as to what’s going
> on? It seems to be bursty in nature. The bursty requests cause the job to
> fail with the cluster not having enough resources because it’s in the
> process of releasing them.
>
> “org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException:
> Could not allocate enough slots to run the job. Please make sure that the
> cluster has enough resources.” It causes the job to run very
> inconsistently.
>
>
>
> Since legacy mode is now gone in 1.9, we don’t really see many options
> here.
>
>
>
> *Run Profile*
>
> *Number of returned excess containers*
>
> 12G per TM, 2 slots
> yarn.heartbeat.container-request-interval=500
>
> 685
>
> 12G per TM, 2 slots
> yarn.heartbeat.container-request-interval=5000
>
> 552
>
> 12G per TM, 2 slots
> yarn.heartbeat.container-request-interval=1
>
> 331
>
> 10G per TM, 1 slots
> yarn.heartbeat.container-request-interval=6
>
> 478
>
>
>
> 2019-10-25 09:55:51,452 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph- Deploying
> CHAIN DataSource (synonym | Read Staging From File System | AVRO) -> Map
> (Map at readAvroFileWithFilter(FlinkReadUtils.java:78)) -> Map (Key
> Extractor) (14/90) (attempt #0) to
> container_e22_1571837093169_78279_01_000852 @ d50503-004-e22.dc.gs.com
> (dataPort=33579)
>
> 2019-10-25 09:55:51,513 INFO
> org.apache.flink.yarn.YarnResourceManager - Received
> new container: container_e22_1571837093169_78279_01_000909 - Remaining
> pending container requests: 0
>
> 2019-10-25 09:55:51,513 INFO
> org.apache.flink.yarn.YarnResourceManager - Returning
> excess container container_e22_1571837093169_78279_01_000909.
>
> 2019-10-25 09:55:51,513 INFO
> org.apache.flink.yarn.YarnResourceManager - Received
> new container: container_e22_1571837093169_78279_01_000910 - Remaining
> pending container requests: 0
>
> 2019-10-25 09:55:51,513 INFO
> org.apache.flink.yarn.YarnResourceManager - Returning
> excess container container_e22_1571837093169_78279_01_000910.
>
> 2019-10-25 09:55:51,513 INFO
> org.apache.flink.yarn.YarnResourceManager - Received
> new container: container_e22_1571837093169_78279_01_000911 - Remaining
> pending container requests: 0
>
> 2019-10-25 09:55:51,513 INFO
> org.apache.flink.yarn.YarnResourceManager - Returning
> excess container container_e22_1571837093169_78279_01_000911.
>
> 2019-10-25 09:55:51,513 INFO
> org.apache.flink.yarn.YarnResourceManager - Received
> new container: container_e22_1571837093169_78279_01_000912 - Remaining
> pending container requests: 0
>
> 2019-10-25 09:55:51,513 INFO
> org.apache.flink.yarn.YarnResourceManager - Returning
> excess container container_e22_1571837093169_78279_01_000912.
>
> 

Re: Flink batch app occasionally hang

2019-10-29 Thread Zhu Zhu
Hi Caio,

Did you check whether there are enough resources to launch the other nodes?

Could you attach the logs you mentioned? And elaborate how the tasks are
connected in the topology?


Thanks,
Zhu Zhu

Caio Aoque  于2019年10月30日周三 上午8:31写道:

> Hi, I've been running some flink scala applications on an AWS EMR cluster
> (version 5.26.0 with flink 1.8.0 for scala 2.11) for a while and I
> started to have some issues now.
>
> I have a flink app that reads some files from S3, process them and save
> some files to s3 and also some records to a database.
>
> The application is not so complex it has a source that reads a directory
> (multiple files) and other one that reads a single one and then it has some
> grouping and mapping and a left outer join between these 2 sources.
>
> The issue is that occasionally the application got stuck with only two
> tasks running, one finished and the other ones not even run. The 2 tasks
> that keep running forever are the source1 from directory (multiple files)
> and the leftouterjoin, the source2 (input from a single file) is the one
> that finishes. One interest thing is that there should be several tasks
> between source 1 and this leftouterjoin but they remain in CREATED state.
> If the app stuck usually I simply kill that and run that again, which
> works. The issue is not that frequent but is getting more and more
> frequent. It's happening almost everyday now.
>
> I also have a DEBUG log from a job that didn't work and another one from a
> job that worked.
>
> Thanks.
>


Re: Flink batch app occasionally hang

2019-10-29 Thread vino yang
Hi Caio,

Because it involves interaction with external systems. It would be better
if you can provide the full logs.

Best,
Vino

Caio Aoque  于2019年10月30日周三 上午8:31写道:

> Hi, I've been running some flink scala applications on an AWS EMR cluster
> (version 5.26.0 with flink 1.8.0 for scala 2.11) for a while and I
> started to have some issues now.
>
> I have a flink app that reads some files from S3, process them and save
> some files to s3 and also some records to a database.
>
> The application is not so complex it has a source that reads a directory
> (multiple files) and other one that reads a single one and then it has some
> grouping and mapping and a left outer join between these 2 sources.
>
> The issue is that occasionally the application got stuck with only two
> tasks running, one finished and the other ones not even run. The 2 tasks
> that keep running forever are the source1 from directory (multiple files)
> and the leftouterjoin, the source2 (input from a single file) is the one
> that finishes. One interest thing is that there should be several tasks
> between source 1 and this leftouterjoin but they remain in CREATED state.
> If the app stuck usually I simply kill that and run that again, which
> works. The issue is not that frequent but is getting more and more
> frequent. It's happening almost everyday now.
>
> I also have a DEBUG log from a job that didn't work and another one from a
> job that worked.
>
> Thanks.
>


Re: Flink checkpointing behavior

2019-10-29 Thread vino yang
Hi Amran,

See my inline answers.

Best,
Vino

amran dean  于2019年10月30日周三 上午2:59写道:

> Hello,
> Exact semantics for checkpointing/task recovery are still a little
> confusing to me after parsing docs: so a few questions.
>
> - What does Flink consider a task failure? Is it any exception that the
> job does not handle?
>

*Flink believes that the task failure is: any factor makes the task itself
unable to continue to run. *

>
> - Do the failure recovery strategies mentioned in
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/task_failure_recovery.html
>  refer
> to restarting from the most recent checkpoint?
> E.g for fixed-delay recoveries, a fixed number of restarts from a specific
> checkpoint are attempted.
>

*For an automatic restart, Flink will try to find the nearest checkpoint.*

>
> - The docs mention the following command to resume from a checkpoint. In
> the checkpoint metadata path I have configured, I only see a series of
> directories named by hashes:
>
> - 24c8d7a38dd90ca8bd5f04c36d1442ba
> - shared
> - taskowned
> - 5d202a0ba04cdc1b917892c1e35d00dc
> - shared
> - taskowned
> How do I know which is the most recent checkpoint?
>

*In the checkpoint directory corresponding to the jobID, you should see
some folder names, like "chk-xxx", so specify this path. More details
please see here[1].*

[1]:
https://ci.apache.org/projects/flink/flink-docs-release-1.9/ops/state/checkpoints.html#resuming-from-a-retained-checkpoint

>
> Really appreciate any help. Thank you.
>


flink1.9.1 on yarn sql 部署问题

2019-10-29 Thread hb
hello:


环境:  flink1.9.1, on yarn hadoop2.6
flink只安装在了一台提交的机器上,


lib目录下有文件:
 flink-dist_2.11-1.9.1.jar
 flink-json-1.9.0-sql-jar.jar
 flink-shaded-hadoop-2-uber-2.6.5-7.0.jar
 flink-sql-connector-kafka_2.11-1.9.0.jar
 flink-table_2.11-1.9.1.jar
 flink-table-blink_2.11-1.9.1.jar
log4j-1.2.17.jar
slf4j-log4j12-1.7.15.jar


//
 flink-shaded-hadoop-2-uber-2.6.5-7.0.jar  ,
 flink-sql-connector-kafka_2.11-1.9.0.jar
 flink-json-1.9.0-sql-jar.jar
这3个包是安装后,拷贝进去的


问题1:on yarn模式,我是否需要在每台机器上都安装flink软件目录,还是只需要在提交机器上有flink软件目录就行了?


问题2: 我需要用到blink-planner 连接外部kafka(1.1版本,json格式) 来生成sql表,是否 
需要在lib目录下添加 
 flink-sql-connector-kafka_2.11-1.9.0.jar
 flink-json-1.9.0-sql-jar.jar
还是 在pom文件中 指定依赖,打成fat包

org.apache.flink
flink-connector-kafka_2.11
${flink.version}  


org.apache.flink
flink-json
${flink.version}

问题3: flink run  on yarn , 会额外附加lib目录下的jar包到用户jar下,再提交到yarn上运行么







Flink batch app occasionally hang

2019-10-29 Thread Caio Aoque
Hi, I've been running some flink scala applications on an AWS EMR cluster
(version 5.26.0 with flink 1.8.0 for scala 2.11) for a while and I started
to have some issues now.

I have a flink app that reads some files from S3, process them and save
some files to s3 and also some records to a database.

The application is not so complex it has a source that reads a directory
(multiple files) and other one that reads a single one and then it has some
grouping and mapping and a left outer join between these 2 sources.

The issue is that occasionally the application got stuck with only two
tasks running, one finished and the other ones not even run. The 2 tasks
that keep running forever are the source1 from directory (multiple files)
and the leftouterjoin, the source2 (input from a single file) is the one
that finishes. One interest thing is that there should be several tasks
between source 1 and this leftouterjoin but they remain in CREATED state.
If the app stuck usually I simply kill that and run that again, which
works. The issue is not that frequent but is getting more and more
frequent. It's happening almost everyday now.

I also have a DEBUG log from a job that didn't work and another one from a
job that worked.

Thanks.


Flink checkpointing behavior

2019-10-29 Thread amran dean
Hello,
Exact semantics for checkpointing/task recovery are still a little
confusing to me after parsing docs: so a few questions.

- What does Flink consider a task failure? Is it any exception that the job
does not handle?

- Do the failure recovery strategies mentioned in
https://ci.apache.org/projects/flink/flink-docs-stable/dev/task_failure_recovery.html
refer
to restarting from the most recent checkpoint?
E.g for fixed-delay recoveries, a fixed number of restarts from a specific
checkpoint are attempted.

- The docs mention the following command to resume from a checkpoint. In
the checkpoint metadata path I have configured, I only see a series of
directories named by hashes:

- 24c8d7a38dd90ca8bd5f04c36d1442ba
- shared
- taskowned
- 5d202a0ba04cdc1b917892c1e35d00dc
- shared
- taskowned
How do I know which is the most recent checkpoint?

Really appreciate any help. Thank you.


How to stream intermediate data that is stored in external storage?

2019-10-29 Thread kant kodali
Hi All,

I want to do a full outer join on two streaming data sources and store the
state of full outer join in some external storage like rocksdb or something
else. And then want to use this intermediate state as a streaming source
again, do some transformation and write it to some external store. is
that possible with Flink 1.9?

Also what storage systems support push mechanism for the intermediate data?
For example, In the use case above does rocksdb support push/emit events in
a streaming fashion?

Thanks!


Re: [FlinkSQL] is there a way to read/write local json data in Flink SQL like that of kafka?

2019-10-29 Thread Anyang Hu
Hi,

Thanks Dawid and Florin.

To Dawid:

CsvTableSource doesn't implements DefinedProctimeAttribute and
DefinedRowtimeAttributes interfaces, so we can not use proctime and
rowtime in source ddl. Except csv, we also need to consume json and pb
data.


To Florin:
Installing local kafka and zk introduces too many third-party
components and may be not universal.

In my scenario, I need to run a local sql job to debug(for example source
and sink are kafka-json, dimension table is jdbc) before submit  it to
yarn. The following usage is what I want:
1)generate local json data for source and dimension table (source table
supports proctime and rowtime);
2)  replace `connetor.type` to 'filesystem';
3)  add `connector.path`  to source table /dimension table ddl property;
4)  new sql can run locally as data read from kafka and jdbc.

Thanks,
Anyang

Spico Florin  于2019年10月29日周二 下午6:35写道:

> Hi!
>
>  Another solution would be to locally install kafka+zookeeper and push
> your dumped json (from the production server) data in a topic(you create a
> Kafka producer).
>  Then you configure your code to point to this local broker. Consume your
> data from topic from either strategy you need (earliest offset, latest).
> The advantage is that you can repeat your tests multiple times as in real
> scenario.
>
> Depending on your use case, there can be different behaviour of your
> processing pipeline when you consume from a file (batch) or from a stream
> (kafka).
> I had this kind of issue when some CEP functionalities.
> I hope it helps.
>  Regards,
>  Florin
>
>
> On Tue, Oct 29, 2019 at 12:00 PM Anyang Hu  wrote:
>
>> Hi guys,
>>
>> In flink1.9, we can set `connector.type` to `kafka` and `format.type` to
>> json to read/write json data from kafka or write json data to kafka.
>>
>> In my scenario, I wish to read local json data as a souce table, since I
>> need to do local debug and don't consume online kafka data.
>>
>> For example:
>>
>>> create table source (
>>> first varchar,
>>> id int
>>> ) with (
>>> 'connector.type' = 'filesystem',
>>> 'connector.path' = '/path/to/json',
>>> 'format.type' = 'json'
>>> )
>>
>>
>> In addition, writing local json data is also needed.
>>
>> Does anyone have similar needs?
>>
>> Best regards,
>> Anyang
>>
>


low performance in running queries

2019-10-29 Thread Habib Mostafaei

Hi all,

I am running Flink on a standalone cluster and getting very long 
execution time for the streaming queries like WordCount for a fixed text 
file. My VM runs on a Debian 10 with 16 cpu cores and 32GB of RAM. I 
have a text file with size of 2GB. When I run the Flink on a standalone 
cluster, i.e., one JobManager and one taskManager with 25GB of heapsize, 
it took around two hours to finish counting this file while a simple 
python script can do it in around 7 minutes. Just wondering what is 
wrong with my setup. I ran the experiments on a cluster with six 
taskManagers, but I still get very long execution time like 25 minutes 
or so. I tried to increase the JVM heap size to have lower execution 
time but it did not help. I attached the log file and the Flink 
configuration file to this email.


Best,

Habib


#  Licensed to the Apache Software Foundation (ASF) under one
#  or more contributor license agreements.  See the NOTICE file
#  distributed with this work for additional information
#  regarding copyright ownership.  The ASF licenses this file
#  to you under the Apache License, Version 2.0 (the
#  "License"); you may not use this file except in compliance
#  with the License.  You may obtain a copy of the License at
#
#  http://www.apache.org/licenses/LICENSE-2.0
#
#  Unless required by applicable law or agreed to in writing, software
#  distributed under the License is distributed on an "AS IS" BASIS,
#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#  See the License for the specific language governing permissions and
# limitations under the License.



#==
# Common
#==

# The external address of the host on which the JobManager runs and can be
# reached by the TaskManagers and any clients which want to connect. This 
setting
# is only used in Standalone mode and may be overwritten on the JobManager side
# by specifying the --host  parameter of the bin/jobmanager.sh 
executable.
# In high availability mode, if you use the bin/start-cluster.sh script and 
setup
# the conf/masters file, this will be taken care of automatically. Yarn/Mesos
# automatically configure the host name based on the hostname of the node where 
the
# JobManager runs.

jobmanager.rpc.address: localhost

# The RPC port where the JobManager is reachable.

jobmanager.rpc.port: 6123


# The heap size for the JobManager JVM

jobmanager.heap.size: 25000m


# The heap size for the TaskManager JVM

taskmanager.heap.size: 25000m


# The number of task slots that each TaskManager offers. Each slot runs one 
parallel pipeline.

taskmanager.numberOfTaskSlots: 1

# The parallelism used for programs that did not specify and other parallelism.

parallelism.default: 1

# The default file system scheme and authority.
# 
# By default file paths without scheme are interpreted relative to the local
# root file system 'file:///'. Use this to override the default and interpret
# relative paths relative to a different file system,
# for example 'hdfs://mynamenode:12345'
#
# fs.default-scheme

#==
# High Availability
#==

# The high-availability mode. Possible options are 'NONE' or 'zookeeper'.
#
# high-availability: zookeeper

# The path where metadata for master recovery is persisted. While ZooKeeper 
stores
# the small ground truth for checkpoint and leader election, this location 
stores
# the larger objects, like persisted dataflow graphs.
# 
# Must be a durable file system that is accessible from all nodes
# (like HDFS, S3, Ceph, nfs, ...) 
#
# high-availability.storageDir: hdfs:///flink/ha/

# The list of ZooKeeper quorum peers that coordinate the high-availability
# setup. This must be a list of the form:
# "host1:clientPort,host2:clientPort,..." (default clientPort: 2181)
#
# high-availability.zookeeper.quorum: localhost:2181


# ACL options are based on 
https://zookeeper.apache.org/doc/r3.1.2/zookeeperProgrammers.html#sc_BuiltinACLSchemes
# It can be either "creator" (ZOO_CREATE_ALL_ACL) or "open" 
(ZOO_OPEN_ACL_UNSAFE)
# The default value is "open" and it can be changed to "creator" if ZK security 
is enabled
#
# high-availability.zookeeper.client.acl: open

#==
# Fault tolerance and checkpointing
#==

# The backend that will be used to store operator state checkpoints if
# checkpointing is enabled.
#
# Supported backends are 'jobmanager', 'filesystem', 'rocksdb', or the
# .
#
# state.backend: 

Streaming File Sink - Parquet File Writer

2019-10-29 Thread Vinay Patil
Hi,

I am not able to roll the files based on file size as the bulkFormat has
onCheckpointRollingPolicy.

One way is to write CustomStreamingFileSink and provide RollingPolicy like
RowFormatBuilder. Is this the correct way to go ahead ?

Another way is to write ParquetEncoder and use RowFormatBuilder.

P.S. Curious to know Why was the RollingPolicy not exposed in case of
BulkFormat ?

Regards,
Vinay Patil


Re: Flink 1.8.1 HDFS 2.6.5 issue

2019-10-29 Thread Dian Fu
You could also disable the security feature of the Hadoop cluster or upgrade 
the hadoop version. I'm not sure if this is acceptable for you as it requires 
more changes. Setting the configuration is the minimum changes I could think of 
to solve this issue as it will not affect other users of the hadoop cluster. 
You could also turn to the hadoop community to see if they could provide some 
help as this is actually a hadoop problem.

> 在 2019年10月29日,下午2:25,V N, Suchithra (Nokia - IN/Bangalore) 
>  写道:
> 
> Thanks for the information. Without setting such parameter explicitly, is 
> there any possibility that it may work intermittently?
>  
> From: Dian Fu  
> Sent: Tuesday, October 29, 2019 7:12 AM
> To: V N, Suchithra (Nokia - IN/Bangalore) 
> Cc: user@flink.apache.org
> Subject: Re: Flink 1.8.1 HDFS 2.6.5 issue
>  
> I guess this is a bug in Hadoop 2.6.5 and has been fixed in Hadoop 2.8.0 [1]. 
> You can work around it by explicitly setting the configration 
> "hadoop.security.crypto.codec.classes.aes.ctr.nopadding" as 
> "org.apache.hadoop.crypto.OpensslAesCtrCryptoCodec, 
> org.apache.hadoop.crypto.JceAesCtrCryptoCodec".
>  
> [1] https://issues.apache.org/jira/browse/HADOOP-11711 
> 
>  
> 在 2019年10月28日,下午8:59,V N, Suchithra (Nokia - IN/Bangalore) 
> mailto:suchithra@nokia.com>> 写道:
>  
> Hi,
> From debug logs I could see below logs in taskmanager. Please have a look.
>  
> org.apache.hadoop.ipc.ProtobufRpcEngine Call: addBlock took 372ms"}
> org.apache.hadoop.ipc.ProtobufRpcEngine Call: addBlock took 372ms"}
> org.apache.hadoop.hdfs.DFSClient pipeline = 10.76.113.216:1044"}
> org.apache.hadoop.hdfs.DFSClient pipeline = 10.76.113.216:1044"}
> org.apache.hadoop.hdfs.DFSClient Connecting to datanode 10.76.113.216:1044"}
> org.apache.hadoop.hdfs.DFSClient Connecting to datanode 10.76.113.216:1044"}
> org.apache.hadoop.hdfs.DFSClient Send buf size 131072"}
> org.apache.hadoop.hdfs.DFSClient Send buf size 131072"}
> o.a.h.h.p.d.sasl.SaslDataTransferClient SASL client doing encrypted handshake 
> for addr = /10.76.113.216, datanodeId = 10.76.113.216:1044"}
> o.a.h.h.p.d.sasl.SaslDataTransferClient SASL client doing encrypted handshake 
> for addr = /10.76.113.216, datanodeId = 10.76.113.216:1044"}
> o.a.h.h.p.d.sasl.SaslDataTransferClient Client using encryption algorithm 
> 3des"}
> o.a.h.h.p.d.sasl.SaslDataTransferClient Client using encryption algorithm 
> 3des"}
> o.a.h.h.p.d.sasl.DataTransferSaslUtil Verifying QOP, requested QOP = 
> [auth-conf], negotiated QOP = auth-conf"}
> o.a.h.h.p.d.sasl.DataTransferSaslUtil Verifying QOP, requested QOP = 
> [auth-conf], negotiated QOP = auth-conf"}
> o.a.h.h.p.d.sasl.DataTransferSaslUtil Creating IOStreamPair of 
> CryptoInputStream and CryptoOutputStream."}
> o.a.h.h.p.d.sasl.DataTransferSaslUtil Creating IOStreamPair of 
> CryptoInputStream and CryptoOutputStream."}
> o.apache.hadoop.util.PerformanceAdvisory No crypto codec classes with cipher 
> suite configured."}
> o.apache.hadoop.util.PerformanceAdvisory No crypto codec classes with cipher 
> suite configured."}
> org.apache.hadoop.hdfs.DFSClient DataStreamer Exception"}
> java.lang.NullPointerException: null
>   at 
> org.apache.hadoop.crypto.CryptoInputStream.(CryptoInputStream.java:132)
>   at 
> org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataTransferSaslUtil.createStreamPair(DataTransferSaslUtil.java:345)
>   at 
> org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient.doSaslHandshake(SaslDataTransferClient.java:489)
>   at 
> org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient.getEncryptedStreams(SaslDataTransferClient.java:298)
>   at 
> org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient.send(SaslDataTransferClient.java:241)
>   at 
> org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient.checkTrustAndSend(SaslDataTransferClient.java:210)
>   at 
> org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient.socketSend(SaslDataTransferClient.java:182)
>   at 
> org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.createBlockOutputStream(DFSOutputStream.java:1409)
>   at 
> org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.nextBlockOutputStream(DFSOutputStream.java:1357)
>   at 
> org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:587)
>  
> Regards,
> Suchithra
>  
> From: Dian Fu mailto:dian0511...@gmail.com>> 
> Sent: Monday, October 28, 2019 5:40 PM
> To: V N, Suchithra (Nokia - IN/Bangalore)  >
> Cc: user@flink.apache.org 
> Subject: Re: Flink 1.8.1 HDFS 2.6.5 issue
>  
> It seems that the CryptoCodec is null from the exception stack trace. This 
> may occur when "hadoop.security.crypto.codec.classes.aes.ctr.nopadding" is 
> 

Re: [FlinkSQL] is there a way to read/write local json data in Flink SQL like that of kafka?

2019-10-29 Thread Spico Florin
Hi!

 Another solution would be to locally install kafka+zookeeper and push your
dumped json (from the production server) data in a topic(you create a Kafka
producer).
 Then you configure your code to point to this local broker. Consume your
data from topic from either strategy you need (earliest offset, latest).
The advantage is that you can repeat your tests multiple times as in real
scenario.

Depending on your use case, there can be different behaviour of your
processing pipeline when you consume from a file (batch) or from a stream
(kafka).
I had this kind of issue when some CEP functionalities.
I hope it helps.
 Regards,
 Florin


On Tue, Oct 29, 2019 at 12:00 PM Anyang Hu  wrote:

> Hi guys,
>
> In flink1.9, we can set `connector.type` to `kafka` and `format.type` to
> json to read/write json data from kafka or write json data to kafka.
>
> In my scenario, I wish to read local json data as a souce table, since I
> need to do local debug and don't consume online kafka data.
>
> For example:
>
>> create table source (
>> first varchar,
>> id int
>> ) with (
>> 'connector.type' = 'filesystem',
>> 'connector.path' = '/path/to/json',
>> 'format.type' = 'json'
>> )
>
>
> In addition, writing local json data is also needed.
>
> Does anyone have similar needs?
>
> Best regards,
> Anyang
>


Re: [FlinkSQL] is there a way to read/write local json data in Flink SQL like that of kafka?

2019-10-29 Thread Dawid Wysakowicz
Hi,

Unfortunately it is not possible out of the box. The only format that
the filesystem connector supports as of now is CSV.

As a workaround you could create a Table out of a DataStream reusing the
JsonRowDeserializationSchema. Have a look at the example below:

        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

        DataStream input = env.fromElements(
            "{\"lon\": 123.23, \"rideTime\": \"2019\", \"obj\": {\"numb\": 
1234}}".getBytes()
        ); // or read from file record by record

        JsonRowDeserializationSchema jsonSchema = new 
JsonRowDeserializationSchema.Builder(...).build();

        TypeInformation producedType = jsonSchema.getProducedType();
        SingleOutputStreamOperator in = input.map(jsonSchema::deserialize)
            .returns(producedType);

        tEnv.registerDataStream("t", in);

Table table = tEnv.sqlQuery("SELECT * FROM t");

Best,

Dawid

On 29/10/2019 10:59, Anyang Hu wrote:
> Hi guys,
>
> In flink1.9, we can set `connector.type` to `kafka` and `format.type`
> to json to read/write json data from kafka or write json data to kafka.
>
> In my scenario, I wish to read local json data as a souce table, since
> I need to do local debug and don't consume online kafka data.
>
> For example:
>
> create table source (
> first varchar,
> id int
> ) with (
> 'connector.type' = 'filesystem',
> 'connector.path' = '/path/to/json',
> 'format.type' = 'json'
> )
>
>
> In addition, writing local json data is also needed.
>
> Does anyone have similar needs?
>
> Best regards,
> Anyang


signature.asc
Description: OpenPGP digital signature


[FlinkSQL] is there a way to read/write local json data in Flink SQL like that of kafka?

2019-10-29 Thread Anyang Hu
Hi guys,

In flink1.9, we can set `connector.type` to `kafka` and `format.type` to
json to read/write json data from kafka or write json data to kafka.

In my scenario, I wish to read local json data as a souce table, since I
need to do local debug and don't consume online kafka data.

For example:

> create table source (
> first varchar,
> id int
> ) with (
> 'connector.type' = 'filesystem',
> 'connector.path' = '/path/to/json',
> 'format.type' = 'json'
> )


In addition, writing local json data is also needed.

Does anyone have similar needs?

Best regards,
Anyang


Re: Add custom fields into Json

2019-10-29 Thread Jingsong Li
Hi Srikanth,
Which sql throws exception?

On Tue, Oct 29, 2019 at 3:41 PM vino yang  wrote:

> Hi,
>
> The exception shows your SQL statement has grammatical errors. Please
> check it again or provide the whole SQL statement here.
>
> Best,
> Vino
>
> srikanth flink  于2019年10月29日周二 下午2:51写道:
>
>> Hi there,
>>
>> I'm querying json data and is working fine. I would like to add custom
>> fields including the query result.
>> My query looks like: select ROW(`source`),  ROW(`destination`),
>> ROW(`dns`), organization, cnt from (select
>> (source.`ip`,source.`isInternalIP`) as source,
>> (destination.`ip`,destination.`isInternalIP`) as destination,
>>  (dns.`query`, dns.`answers`.`data`) as dns, organization as organization
>> from dnsTableS) tab1;
>>
>> While I would like to add "'dns' as `agg.type`, 'dns' as `agg.name`" to
>> the same output, but the query is throw exceptions:
>> [ERROR] Could not execute SQL statement. Reason:
>> org.apache.flink.sql.parser.impl.ParseException: Encountered "." at line
>> 1, column 278.
>> Was expecting one of:
>> "EXCEPT" ...
>> "FETCH" ...
>> "FROM" ...
>> "INTERSECT" ...
>> "LIMIT" ...
>> "OFFSET" ...
>> "ORDER" ...
>> "MINUS" ...
>> "UNION" ...
>> ")" ...
>> "," ...
>>
>> Could someone help me with this? Thanks
>>
>> Srikanth
>>
>

-- 
Best, Jingsong Lee


Re: How to use two continuously window with EventTime in sql

2019-10-29 Thread Jark Wu
Hi,

You can use TUMBLE_ROWTIME(...) to get the rowtime attribute of the first
window result, and use this field to apply a following window aggregate.
See more
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql.html#group-windows

Best,
Jark

On Tue, 29 Oct 2019 at 15:39, 刘建刚  wrote:

>   For one sql window, I can register table with event time and use
> time field in the tumble window. But if I want to use the result for the
> first window and use another window to process it, how can I do it? Thank
> you.
>


Re: How to use two continuously window with EventTime in sql

2019-10-29 Thread Jark Wu
Hi,

You can use TUMBLE_ROWTIME(...) to get the rowtime attribute of the first
window result, and use this field to apply a following window aggregate.
See more
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql.html#group-windows

Best,
Jark

On Tue, 29 Oct 2019 at 15:39, 刘建刚  wrote:

>   For one sql window, I can register table with event time and use
> time field in the tumble window. But if I want to use the result for the
> first window and use another window to process it, how can I do it? Thank
> you.
>


Re:回复:flink1.9.1 kafka表读取问题

2019-10-29 Thread hb
pom 文件

```

http://maven.apache.org/POM/4.0.0;
 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance;
 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd;>
4.0.0


com.hb
flink
pom
1.9.1-SNAPSHOT



1.8
UTF-8
1.9.1
2.11
2.11.12
6.2.3
1.72
2.6.2
0.11.0.2
1.2.46
1.9.1
1.2.17
5.1.42
4.18.1
3.6.0
compile

compile




org.apache.flink
flink-core
${flink.version}
${flink.scope.type}




org.apache.flink
flink-clients_2.11
${flink.version}
${flink.scope.type}




org.apache.flink
flink-scala_2.11
${flink.version}
${flink.scope.type}




org.apache.flink
flink-streaming-scala_2.11
${flink.version}
${flink.scope.type}





org.apache.flink
flink-table-common
${flink.version}




org.apache.flink
flink-table-api-scala-bridge_2.11
${flink.version}





org.apache.flink
flink-table-api-java-bridge_2.11
${flink.version}





org.apache.flink
flink-table-planner_2.11
${flink.version}





org.apache.flink
flink-table-runtime-blink_2.11
${flink.version}






org.apache.flink
flink-table-planner-blink_2.11
${flink.version}





org.apache.flink
flink-connector-kafka-0.11_2.11
${flink-connector-kafka}







org.apache.flink
flink-connector-kafka_2.11
${flink.version}






org.apache.flink
flink-json
${flink.version}




org.apache.flink
flink-connector-elasticsearch6_2.11
${flink.version}






org.apache.flink
flink-runtime-web_2.11
${flink.version}








org.elasticsearch
elasticsearch-hadoop
${elasticsearch.hadoop}
${scope.type}



org.scala-lang
scala-library
${scala.version}
${scope.type}


org.scala-lang
scala-reflect
${scala.version}
${scope.type}







org.apache.kafka
kafka-clients
${kafka.version}











com.beust
jcommander
${jcommander.version}




com.google.code.gson
gson
${gson.version}




com.alibaba
fastjson
${fastjson.version}






log4j
log4j
${log4j.version}






mysql
mysql-connector-java
${mysql-connector-java.version}




net.dongliu
requests
${net.dongliu.requests.version}













org.apache.maven.plugins
maven-compiler-plugin
${maven-compiler-plugin.version}

${java.version}
${java.version}
${project.build.sourceEncoding}



net.alchim31.maven
scala-maven-plugin
3.3.2


scala-compile-first
process-resources

compile



scala-test-compile-first
process-test-resources

testCompile



attach-scaladocs
verify

doc-jar







org.apache.maven.plugins
maven-shade-plugin
3.1.1


   

Re: Add custom fields into Json

2019-10-29 Thread Jingsong Li
Hi Srikanth,
1.Can you share complete sql? And schema of input table?
2.What schema you want to select? I didn't understand what is "agg" field
mean.

On Tue, Oct 29, 2019 at 3:28 PM Jingsong Li  wrote:

> Hi Srikanth,
> 1.Can you share complete sql? And schema of input table?
> 2.What schema you want to select? I didn't understand what is "agg" field
> mean.
>
> On Tue, Oct 29, 2019 at 2:51 PM srikanth flink 
> wrote:
>
>> Hi there,
>>
>> I'm querying json data and is working fine. I would like to add custom
>> fields including the query result.
>> My query looks like: select ROW(`source`),  ROW(`destination`),
>> ROW(`dns`), organization, cnt from (select
>> (source.`ip`,source.`isInternalIP`) as source,
>> (destination.`ip`,destination.`isInternalIP`) as destination,
>>  (dns.`query`, dns.`answers`.`data`) as dns, organization as organization
>> from dnsTableS) tab1;
>>
>> While I would like to add "'dns' as `agg.type`, 'dns' as `agg.name`" to
>> the same output, but the query is throw exceptions:
>> [ERROR] Could not execute SQL statement. Reason:
>> org.apache.flink.sql.parser.impl.ParseException: Encountered "." at line
>> 1, column 278.
>> Was expecting one of:
>> "EXCEPT" ...
>> "FETCH" ...
>> "FROM" ...
>> "INTERSECT" ...
>> "LIMIT" ...
>> "OFFSET" ...
>> "ORDER" ...
>> "MINUS" ...
>> "UNION" ...
>> ")" ...
>> "," ...
>>
>> Could someone help me with this? Thanks
>>
>> Srikanth
>>
>
>
> --
> Best, Jingsong Lee
>


-- 
Best, Jingsong Lee


Re: Add custom fields into Json

2019-10-29 Thread vino yang
Hi,

The exception shows your SQL statement has grammatical errors. Please check
it again or provide the whole SQL statement here.

Best,
Vino

srikanth flink  于2019年10月29日周二 下午2:51写道:

> Hi there,
>
> I'm querying json data and is working fine. I would like to add custom
> fields including the query result.
> My query looks like: select ROW(`source`),  ROW(`destination`),
> ROW(`dns`), organization, cnt from (select
> (source.`ip`,source.`isInternalIP`) as source,
> (destination.`ip`,destination.`isInternalIP`) as destination,
>  (dns.`query`, dns.`answers`.`data`) as dns, organization as organization
> from dnsTableS) tab1;
>
> While I would like to add "'dns' as `agg.type`, 'dns' as `agg.name`" to
> the same output, but the query is throw exceptions:
> [ERROR] Could not execute SQL statement. Reason:
> org.apache.flink.sql.parser.impl.ParseException: Encountered "." at line
> 1, column 278.
> Was expecting one of:
> "EXCEPT" ...
> "FETCH" ...
> "FROM" ...
> "INTERSECT" ...
> "LIMIT" ...
> "OFFSET" ...
> "ORDER" ...
> "MINUS" ...
> "UNION" ...
> ")" ...
> "," ...
>
> Could someone help me with this? Thanks
>
> Srikanth
>


Re: Using STSAssumeRoleSessionCredentialsProvider for cross account access

2019-10-29 Thread Vinay Patil
Thanks Fabian,

@Gordon - Can you please help here.

Regards,
Vinay Patil


On Fri, Oct 25, 2019 at 9:11 PM Fabian Hueske  wrote:

> Hi Vinay,
>
> Maybe Gordon (in CC) has an idea about this issue.
>
> Best, Fabian
>
> Am Do., 24. Okt. 2019 um 14:50 Uhr schrieb Vinay Patil <
> vinay18.pa...@gmail.com>:
>
>> Hi,
>>
>> Can someone pls help here , facing issues in Prod . I see the following
>> ticket in unresolved state.
>>
>> https://issues.apache.org/jira/browse/FLINK-8417
>>
>>
>> Regards,
>> Vinay Patil
>>
>>
>> On Thu, Oct 24, 2019 at 11:01 AM Vinay Patil 
>> wrote:
>>
>>> Hi,
>>>
>>> I am trying to access dynamo streams from a different aws account but
>>> getting resource not found exception while trying to access the dynamo
>>> streams from Task Manager. I have provided the following configurations :
>>>
>>> *dynamodbStreamsConsumerConfig.setProperty(ConsumerConfigConstants.AWS_ROLE_CREDENTIALS_PROVIDER,AWSConfigConstants.CredentialProvider.ASSUME_ROLE.name
>>> ());*
>>>
>>>
>>> *dynamodbStreamsConsumerConfig.setProperty(ConsumerConfigConstants.AWS_ROLE_ARN,dynamoDbConnect.getRoleArn());*
>>>
>>>
>>> *dynamodbStreamsConsumerConfig.setProperty(ConsumerConfigConstants.AWS_ROLE_SESSION_NAME,dynamoDbConnect.getRoleSessionName());*
>>>
>>> In the main class I am able to get the arn of dynamoDb table
>>> using STSAssumeRoleSessionCredentialsProvider, so the assume role is
>>> working fine . Getting error only while accessing from TM.
>>>
>>> I assume that the credentials are not required to be passed :
>>> https://github.com/apache/flink/blob/abbd6b02d743486f3c0c1336139dd6b3edd20840/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/AWSUtil.java#L164
>>>
>>>
>>> Regards,
>>> Vinay Patil
>>>
>>


How to use two continuously window with EventTime in sql

2019-10-29 Thread 刘建刚
  For one sql window, I can register table with event time and use time
field in the tumble window. But if I want to use the result for the first
window and use another window to process it, how can I do it? Thank you.


How to use two continuously window with EventTime in sql

2019-10-29 Thread 刘建刚
  For one sql window, I can register table with event time and use time
field in the tumble window. But if I want to use the result for the first
window and use another window to process it, how can I do it? Thank you.


??????flink1.9.1 kafka??????????

2019-10-29 Thread ????????
??
  maven??pom












----
??:"hb"<343122...@163.com;
:2019??10??29??(??) 2:53
??:"user-zh"

Re: Cannot modify parallelism (rescale job) more than once

2019-10-29 Thread Pankaj Chand
Thank you!

On Mon, Oct 28, 2019 at 3:53 AM vino yang  wrote:

> Hi Pankaj,
>
> It seems it is a bug. You can report it by opening a Jira issue.
>
> Best,
> Vino
>
> Pankaj Chand  于2019年10月28日周一 上午10:51写道:
>
>> Hello,
>>
>> I am trying to modify the parallelism of a streaming Flink job
>> (wiki-edits example) multiple times on a standalone cluster (one local
>> machine) having two TaskManagers with 3 slots each (i.e. 6 slots total).
>> However, the "modify" command is only working once (e.g. when I change the
>> parallelism from 2 to 4). The second time (e.g. change parallelism to 6 or
>> even back to 2), it is giving an error.
>>
>> I am using Flink 1.8.1 (since I found that the modify parallelism command
>> has been removed from v1.9 documentation) and have configured savepoints to
>> be written to file:///home/pankaj/flink-checkpoints. The output of the
>> first "modify  -p 4" command and second "modify  -p 6"
>> command is copied below.
>>
>> Please tell me how to modify parallelism multiple times at runtime.
>>
>> Thanks,
>>
>> Pankaj
>>
>>
>> $ ./bin/flink modify 94831ca34951975dbee3335a384ee935 -p 4
>> Modify job 94831ca34951975dbee3335a384ee935.
>> Rescaled job 94831ca34951975dbee3335a384ee935. Its new parallelism is 4.
>>
>> $ ./bin/flink modify 94831ca34951975dbee3335a384ee935 -p 6
>> Modify job 94831ca34951975dbee3335a384ee935.
>>
>> 
>>  The program finished with the following exception:
>>
>> org.apache.flink.util.FlinkException: Could not rescale job
>> 94831ca34951975dbee3335a384ee935.
>> at
>> org.apache.flink.client.cli.CliFrontend.lambda$modify$10(CliFrontend.java:799)
>> at
>> org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:985)
>> at org.apache.flink.client.cli.CliFrontend.modify(CliFrontend.java:790)
>> at
>> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1068)
>> at
>> org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1126)
>> at
>> org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
>> at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1126)
>> Caused by: java.util.concurrent.CompletionException:
>> java.lang.IllegalStateException: Suspend needs to happen atomically
>> at
>> java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273)
>> at
>> java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280)
>> at
>> java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:961)
>> at
>> java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:926)
>> at
>> java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)
>> at
>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:392)
>> at
>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:185)
>> at
>> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
>> at
>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:147)
>> at
>> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40)
>> at
>> akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)
>> at akka.actor.Actor.aroundReceive(Actor.scala:502)
>> at akka.actor.Actor.aroundReceive$(Actor.scala:500)
>> at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
>> at akka.actor.ActorCell.invoke(ActorCell.scala:495)
>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
>> at akka.dispatch.Mailbox.run(Mailbox.scala:224)
>> at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
>> at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
>> at
>> java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
>> at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
>> at
>> java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)
>> Caused by: java.lang.IllegalStateException: Suspend needs to happen
>> atomically
>> at org.apache.flink.util.Preconditions.checkState(Preconditions.java:195)
>> at
>> org.apache.flink.runtime.executiongraph.ExecutionGraph.suspend(ExecutionGraph.java:1172)
>> at
>> org.apache.flink.runtime.jobmaster.JobMaster.suspendExecutionGraph(JobMaster.java:1221)
>> at
>> org.apache.flink.runtime.jobmaster.JobMaster.lambda$rescaleOperators$5(JobMaster.java:465)
>> at
>> java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:952)
>> ... 20 more
>>
>


Re:回复:flink1.9.1 kafka表读取问题

2019-10-29 Thread hb



我指定的是0.11版本kafka, 之前lib目录下 只有 0.11 jar的文件, 还是报这个错误的





在 2019-10-29 13:47:34,"如影随形" <1246407...@qq.com> 写道:
>你好:
>
>
>   看着像是包冲突了,你lib包下有4个flink_kafka的jar包,去掉不用的看看呢
>
>
>
>陈浩
>
>
>
>
>
>
>
>
>--原始邮件--
>发件人:"hb"<343122...@163.com;
>发送时间:2019年10月29日(星期二) 下午2:41
>收件人:"user-zh"
>主题:flink1.9.1 kafka表读取问题
>
>
>
>代码本地ide 能正常执行, 有正常输出,
>
>
>打包成fat-jar包后,提交到yarn-session 上执行
>报:
>Caused by: org.apache.kafka.common.config.ConfigException: Invalid value 
>org.apache.flink.kafka010.shaded.org.apache.kafka.common.serialization.ByteArrayDeserializer
> for configuration key.deserializer: Class 
>org.apache.flink.kafka010.shaded.org.apache.kafka.common.serialization.ByteArrayDeserializer
> could not be found.
>
>
>请教下是什么原因?
>
>
>lib目录下文件为:
>flink-dist_2.11-1.9.1.jar
> 
>flink-sql-connector-kafka-0.10_2.11-1.9.0.jar 
>flink-sql-connector-kafka_2.11-1.9.0.jar 
>log4j-1.2.17.jar
>flink-json-1.9.0-sql-jar.jar
>flink-sql-connector-kafka-0.11_2.11-1.9.0.jar 
>flink-table_2.11-1.9.1.jar
> 
>slf4j-log4j12-1.7.15.jar
>flink-shaded-hadoop-2-uber-2.6.5-7.0.jar 
>flink-sql-connector-kafka-0.9_2.11-1.9.0.jar 
>flink-table-blink_2.11-1.9.1.jar
>
>
>
>
>
>
>代码:
>```
>import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, _}
>import org.apache.flink.table.api.EnvironmentSettings
>import org.apache.flink.table.api.scala.{StreamTableEnvironment, _}
>import org.apache.flink.types.Row
>
>object StreamingTable2 extends App{
> val env = StreamExecutionEnvironment.getExecutionEnvironment
> val settings: EnvironmentSettings = 
>EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
> val tEnv: StreamTableEnvironment = StreamTableEnvironment.create(env, 
>settings)
> env.setParallelism(2)
>
> val sourceDDL1 =
> """create table kafka_json_source(
>
> `timestamp` BIGINT,
>
> id int,
>
> name varchar
>
> ) with (
>
> 'connector.type' = 'kafka',
>
> 'connector.version' = '0.11',
>
> 'connector.topic' = 'hbtest2',
>
> 'connector.startup-mode' = 'earliest-offset',
>
> 'connector.properties.0.key' = 'bootstrap.servers',
>
> 'connector.properties.0.value' = '192.168.1.160:19092',
>
> 'connector.properties.1.key' = 'group.id',
>
> 'connector.properties.1.value' = 'groupId1',
>
> 'connector.properties.2.key' = 'zookeeper.connect',
>
> 'connector.properties.2.value' = '192.168.1.160:2181',
>
> 'update-mode' = 'append',
>
> 'format.type' = 'json',
>
> 'format.derive-schema' = 'true'
>
> )
> """
>
> tEnv.sqlUpdate(sourceDDL1)
> tEnv.sqlQuery("select * from 
>kafka_json_source").toAppendStream[Row].print()
> env.execute("table-example2")
>}
>```


Add custom fields into Json

2019-10-29 Thread srikanth flink
Hi there,

I'm querying json data and is working fine. I would like to add custom
fields including the query result.
My query looks like: select ROW(`source`),  ROW(`destination`), ROW(`dns`),
organization, cnt from (select (source.`ip`,source.`isInternalIP`) as
source, (destination.`ip`,destination.`isInternalIP`) as destination,
 (dns.`query`, dns.`answers`.`data`) as dns, organization as organization
from dnsTableS) tab1;

While I would like to add "'dns' as `agg.type`, 'dns' as `agg.name`" to the
same output, but the query is throw exceptions:
[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.sql.parser.impl.ParseException: Encountered "." at line 1,
column 278.
Was expecting one of:
"EXCEPT" ...
"FETCH" ...
"FROM" ...
"INTERSECT" ...
"LIMIT" ...
"OFFSET" ...
"ORDER" ...
"MINUS" ...
"UNION" ...
")" ...
"," ...

Could someone help me with this? Thanks

Srikanth


??????flink1.9.1 kafka??????????

2019-10-29 Thread ????????
??


   
lib??4??flink_kafka??jar












----
??:"hb"<343122...@163.com;
:2019??10??29??(??) 2:41
??:"user-zh"

flink1.9.1 kafka表读取问题

2019-10-29 Thread hb
代码本地ide 能正常执行, 有正常输出,


打包成fat-jar包后,提交到yarn-session 上执行
报:
Caused by: org.apache.kafka.common.config.ConfigException: Invalid value 
org.apache.flink.kafka010.shaded.org.apache.kafka.common.serialization.ByteArrayDeserializer
 for configuration key.deserializer: Class 
org.apache.flink.kafka010.shaded.org.apache.kafka.common.serialization.ByteArrayDeserializer
 could not be found.


请教下是什么原因?


lib目录下文件为:
flink-dist_2.11-1.9.1.jar 
flink-sql-connector-kafka-0.10_2.11-1.9.0.jar  
flink-sql-connector-kafka_2.11-1.9.0.jar  
log4j-1.2.17.jar
flink-json-1.9.0-sql-jar.jar
flink-sql-connector-kafka-0.11_2.11-1.9.0.jar  
flink-table_2.11-1.9.1.jar
slf4j-log4j12-1.7.15.jar
flink-shaded-hadoop-2-uber-2.6.5-7.0.jar  
flink-sql-connector-kafka-0.9_2.11-1.9.0.jar   
flink-table-blink_2.11-1.9.1.jar






代码:
```
import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, _}
import org.apache.flink.table.api.EnvironmentSettings
import org.apache.flink.table.api.scala.{StreamTableEnvironment, _}
import org.apache.flink.types.Row

object StreamingTable2 extends App{
  val env = StreamExecutionEnvironment.getExecutionEnvironment
  val settings: EnvironmentSettings = 
EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
  val tEnv: StreamTableEnvironment = StreamTableEnvironment.create(env, 
settings)
  env.setParallelism(2)

  val sourceDDL1 =
"""create table kafka_json_source(
`timestamp` BIGINT,
id int,
name varchar
  ) with (
'connector.type' = 'kafka',
'connector.version' = '0.11',
'connector.topic' = 'hbtest2',
'connector.startup-mode' = 'earliest-offset',
'connector.properties.0.key' = 'bootstrap.servers',
'connector.properties.0.value' = 
'192.168.1.160:19092',
'connector.properties.1.key' = 'group.id',
'connector.properties.1.value' = 'groupId1',
'connector.properties.2.key' = 'zookeeper.connect',
'connector.properties.2.value' = 
'192.168.1.160:2181',
'update-mode' = 'append',
'format.type' = 'json',
'format.derive-schema' = 'true'
  )
"""

  tEnv.sqlUpdate(sourceDDL1)
  tEnv.sqlQuery("select * from kafka_json_source").toAppendStream[Row].print()
  env.execute("table-example2")
}
```



RE: Flink 1.8.1 HDFS 2.6.5 issue

2019-10-29 Thread V N, Suchithra (Nokia - IN/Bangalore)
Thanks for the information. Without setting such parameter explicitly, is there 
any possibility that it may work intermittently?

From: Dian Fu 
Sent: Tuesday, October 29, 2019 7:12 AM
To: V N, Suchithra (Nokia - IN/Bangalore) 
Cc: user@flink.apache.org
Subject: Re: Flink 1.8.1 HDFS 2.6.5 issue

I guess this is a bug in Hadoop 2.6.5 and has been fixed in Hadoop 2.8.0 [1]. 
You can work around it by explicitly setting the configration 
"hadoop.security.crypto.codec.classes.aes.ctr.nopadding" as 
"org.apache.hadoop.crypto.OpensslAesCtrCryptoCodec, 
org.apache.hadoop.crypto.JceAesCtrCryptoCodec".

[1] https://issues.apache.org/jira/browse/HADOOP-11711

在 2019年10月28日,下午8:59,V N, Suchithra (Nokia - IN/Bangalore) 
mailto:suchithra@nokia.com>> 写道:

Hi,
>From debug logs I could see below logs in taskmanager. Please have a look.

org.apache.hadoop.ipc.ProtobufRpcEngine Call: addBlock took 372ms"}
org.apache.hadoop.ipc.ProtobufRpcEngine Call: addBlock took 372ms"}
org.apache.hadoop.hdfs.DFSClient pipeline = 10.76.113.216:1044"}
org.apache.hadoop.hdfs.DFSClient pipeline = 10.76.113.216:1044"}
org.apache.hadoop.hdfs.DFSClient Connecting to datanode 10.76.113.216:1044"}
org.apache.hadoop.hdfs.DFSClient Connecting to datanode 10.76.113.216:1044"}
org.apache.hadoop.hdfs.DFSClient Send buf size 131072"}
org.apache.hadoop.hdfs.DFSClient Send buf size 131072"}
o.a.h.h.p.d.sasl.SaslDataTransferClient SASL client doing encrypted handshake 
for addr = /10.76.113.216, datanodeId = 10.76.113.216:1044"}
o.a.h.h.p.d.sasl.SaslDataTransferClient SASL client doing encrypted handshake 
for addr = /10.76.113.216, datanodeId = 10.76.113.216:1044"}
o.a.h.h.p.d.sasl.SaslDataTransferClient Client using encryption algorithm 3des"}
o.a.h.h.p.d.sasl.SaslDataTransferClient Client using encryption algorithm 3des"}
o.a.h.h.p.d.sasl.DataTransferSaslUtil Verifying QOP, requested QOP = 
[auth-conf], negotiated QOP = auth-conf"}
o.a.h.h.p.d.sasl.DataTransferSaslUtil Verifying QOP, requested QOP = 
[auth-conf], negotiated QOP = auth-conf"}
o.a.h.h.p.d.sasl.DataTransferSaslUtil Creating IOStreamPair of 
CryptoInputStream and CryptoOutputStream."}
o.a.h.h.p.d.sasl.DataTransferSaslUtil Creating IOStreamPair of 
CryptoInputStream and CryptoOutputStream."}
o.apache.hadoop.util.PerformanceAdvisory No crypto codec classes with cipher 
suite configured."}
o.apache.hadoop.util.PerformanceAdvisory No crypto codec classes with cipher 
suite configured."}
org.apache.hadoop.hdfs.DFSClient DataStreamer Exception"}
java.lang.NullPointerException: null
  at 
org.apache.hadoop.crypto.CryptoInputStream.(CryptoInputStream.java:132)
  at 
org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataTransferSaslUtil.createStreamPair(DataTransferSaslUtil.java:345)
  at 
org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient.doSaslHandshake(SaslDataTransferClient.java:489)
  at 
org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient.getEncryptedStreams(SaslDataTransferClient.java:298)
  at 
org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient.send(SaslDataTransferClient.java:241)
  at 
org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient.checkTrustAndSend(SaslDataTransferClient.java:210)
  at 
org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient.socketSend(SaslDataTransferClient.java:182)
  at 
org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.createBlockOutputStream(DFSOutputStream.java:1409)
  at 
org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.nextBlockOutputStream(DFSOutputStream.java:1357)
  at 
org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:587)

Regards,
Suchithra

From: Dian Fu mailto:dian0511...@gmail.com>>
Sent: Monday, October 28, 2019 5:40 PM
To: V N, Suchithra (Nokia - IN/Bangalore) 
mailto:suchithra@nokia.com>>
Cc: user@flink.apache.org
Subject: Re: Flink 1.8.1 HDFS 2.6.5 issue

It seems that the CryptoCodec is null from the exception stack trace. This may 
occur when "hadoop.security.crypto.codec.classes.aes.ctr.nopadding" is 
misconfigured. You could change the log level to "DEBUG" and it will show more 
detailed information about why CryptoCodec is null.

在 2019年10月28日,下午7:14,V N, Suchithra (Nokia - IN/Bangalore) 
mailto:suchithra@nokia.com>> 写道:

Hi,

I am trying to execute Wordcount.jar in Flink 1.8.1 with Hadoop version 2.6.5. 
HDFS is enabled with Kerberos+SSL. While writing output to HDFS, facing the 
below exception and job will be failed. Please let me know if any suggestions 
to debug this issue.

Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution 
failed.
at 
org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
at