[jira] [Created] (FLINK-9280) Extend JobSubmitHandler to accept jar files

2018-05-01 Thread Chesnay Schepler (JIRA)
Chesnay Schepler created FLINK-9280:
---

 Summary: Extend JobSubmitHandler to accept jar files
 Key: FLINK-9280
 URL: https://issues.apache.org/jira/browse/FLINK-9280
 Project: Flink
  Issue Type: New Feature
  Components: Job-Submission, REST
Affects Versions: 1.5.0
Reporter: Chesnay Schepler


The job submission through the CLI first uploads all require jars to the blob 
server, sets the blob keys in the jobgraph, and then uploads this graph to The 
{{JobSubmitHandler}} which submits it to the Dispatcher.

This process has the downside that it requires jars to be uploaded to the 
blobserver before submitting the job graph, which does not happen via REST.

I propose an extension to the the {{JobSubmitHandler}} to also accept an 
optional list of jar files, that were previously uploaded through the 
{{JarUploadHandler}}. If present, the handler would upload these jars to the 
blobserver and set the blob keys.





--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-6420) Cleaner CEP API to specify conditions between events

2018-05-01 Thread Tarush Grover (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6420?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16459591#comment-16459591
 ] 

Tarush Grover commented on FLINK-6420:
--

Can I take this up?

> Cleaner CEP API to specify conditions between events
> 
>
> Key: FLINK-6420
> URL: https://issues.apache.org/jira/browse/FLINK-6420
> Project: Flink
>  Issue Type: Improvement
>  Components: CEP
>Affects Versions: 1.3.0
>Reporter: Elias Levy
>Priority: Minor
>
> Flink 1.3 will introduce so-called iterative conditions, which allow the 
> predicate to look up events already matched by conditions in the pattern.  
> This permits specifying conditions between matched events, similar to a 
> conditional join between tables in SQL.  Alas, the API could be simplified to 
> specify such conditions more declaratively.
> At the moment you have to do something like
> {code}
> Pattern.
>   .begin[Foo]("first")
> .where( first => first.baz == 1 )
>   .followedBy("next")
> .where({ (next, ctx) =>
>   val first = ctx.getEventsForPattern("first").next
>   first.bar == next.bar && next => next.boo = "x"
> })
> {code}
> which is not very clean.  It would friendlier if you could do something like:
> {code}
> Pattern.
>   .begin[Foo]("first")
> .where( first => first.baz == 1 )
>   .followedBy("next")
> .relatedTo("first", { (first, next) => first.bar == next.bar })
> .where( next => next.boo = "x" )
> {code}
> Something along these lines would work well when the condition being tested 
> against matches a single event (single quantifier).  
> If the condition being tested can accept multiple events (e.g. times 
> quantifier) two other methods could be used {{relatedToAny}} and 
> {{relatedToAll}}, each of which takes a predicate function.  In both cases 
> each previously accepted element of the requested condition is evaluated 
> against the predicate.  In the former case if any evaluation returns true the 
> condition is satisfied.  In the later case all evaluations must return true 
> for the condition to be satisfied.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9196) YARN: Flink binaries are not deleted from HDFS after cluster shutdown

2018-05-01 Thread yuqi (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9196?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16459665#comment-16459665
 ] 

yuqi commented on FLINK-9196:
-

[~gjy] I see, thanks for your suggestion.

> YARN: Flink binaries are not deleted from HDFS after cluster shutdown
> -
>
> Key: FLINK-9196
> URL: https://issues.apache.org/jira/browse/FLINK-9196
> Project: Flink
>  Issue Type: Bug
>  Components: YARN
>Affects Versions: 1.5.0
>Reporter: Gary Yao
>Assignee: Gary Yao
>Priority: Blocker
>  Labels: flip-6
> Fix For: 1.5.0
>
> Attachments: 0001-xxx.patch
>
>
> When deploying on YARN in flip6 mode, the Flink binaries are not deleted from 
> HDFS after the cluster shuts down.
> *Steps to reproduce*
> # Submit job in YARN job mode, non-detached:
> {noformat} HADOOP_CLASSPATH=`hadoop classpath` bin/flink run -m yarn-cluster 
> -yjm 2048 -ytm 2048 ./examples/streaming/WordCount.jar {noformat}
> # Check contents of {{/user/hadoop/.flink/}} on HDFS after 
> job is finished:
> {noformat}
> [hadoop@ip-172-31-43-78 flink-1.5.0]$ hdfs dfs -ls 
> /user/hadoop/.flink/application_1523966184826_0016
> Found 6 items
> -rw-r--r--   1 hadoop hadoop583 2018-04-17 14:54 
> /user/hadoop/.flink/application_1523966184826_0016/90cf5b3a-039e-4d52-8266-4e9563d74827-taskmanager-conf.yaml
> -rw-r--r--   1 hadoop hadoop332 2018-04-17 14:54 
> /user/hadoop/.flink/application_1523966184826_0016/application_1523966184826_0016-flink-conf.yaml3818971235442577934.tmp
> -rw-r--r--   1 hadoop hadoop   89779342 2018-04-02 17:08 
> /user/hadoop/.flink/application_1523966184826_0016/flink-dist_2.11-1.5.0.jar
> drwxrwxrwx   - hadoop hadoop  0 2018-04-17 14:54 
> /user/hadoop/.flink/application_1523966184826_0016/lib
> -rw-r--r--   1 hadoop hadoop   1939 2018-04-02 15:37 
> /user/hadoop/.flink/application_1523966184826_0016/log4j.properties
> -rw-r--r--   1 hadoop hadoop   2331 2018-04-02 15:37 
> /user/hadoop/.flink/application_1523966184826_0016/logback.xml
> {noformat}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9261) Regression - Flink CLI and Web UI not working when SSL is enabled

2018-05-01 Thread Chesnay Schepler (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9261?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16459686#comment-16459686
 ] 

Chesnay Schepler commented on FLINK-9261:
-

Given that the UI and client in part go through the same REST calls I'm not 
sure whether we can implement things such that the client is authenticated but 
the UI isn't.
What we could maybe do is separate the REST API into a monitoring part (getting 
details for a job/jm) and control part (submitting jobs, savepoints, 
cancellation) with separate SSL settings.



> Regression - Flink CLI and Web UI not working when SSL is enabled
> -
>
> Key: FLINK-9261
> URL: https://issues.apache.org/jira/browse/FLINK-9261
> Project: Flink
>  Issue Type: Bug
>  Components: Client, Network, Web Client
>Affects Versions: 1.5.0
>Reporter: Edward Rojas
>Priority: Blocker
>  Labels: regression
> Fix For: 1.5.0
>
>
> When *security.ssl.enabled* config is set to true, Web UI is no longer 
> reachable; there is no logs on jobmanager. 
>  
> When setting *web.ssl.enabled* to false (keeping security.ssl.enabled to 
> true), the dashboard is not reachable and there is the following exception on 
> jobmanager: 
> {code:java}
> WARN  org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint    - 
> Unhandled exception
> org.apache.flink.shaded.netty4.io.netty.handler.ssl.NotSslRecordException: 
> not an SSL/TLS record: 
> 474554202f20485454502f312e310d0a486f73743a206c6f63616c686f73743a383038310d0a436f6e6e656374696f6e3a206b6565702d616c6976650d0a557067726164652d496e7365637572652d52657175657374733a20310d0a557365722d4167656e743a204d6f7a696c6c612f352e3020284d6163696e746f73683b20496e74656c204d6163204f5320582031305f31335f3329204170706c655765624b69742f3533372e333620284b48544d4c2c206c696b65204765636b6f29204368726f6d652f36352e302e32352e313831205361666172692f3533372e33360d0a4163636570743a20746578742f68746d6c2c6170706c69636174696f6e2f7868746d6c2b786d6c2c6170706c69636174696f6e2f786d6c3b713d302e392c696d6167652f776562702c696d6167652f61706e672c2a2f2a3b713d302e380d0a4163636570742d456e636f64696e673a20677a69702c206465666c6174652c2062720d0a4163636570742d4c616e67756167653a20656e2c656e2d47423b713d302e392c65732d3431393b713d302e382c65733b713d302e372c66722d46523b713d302e362c66723b713d302e350d0a436f6f6b69653a20496465612d39326365626136363d39396464633637632d613838382d346439332d396166612d3737396631373636326264320d0a49662d4d6f6469666965642d53696e63653a205468752c2032362041707220323031382031313a30313a313520474d540d0a0d0a
> at 
> org.apache.flink.shaded.netty4.io.netty.handler.ssl.SslHandler.decode(SslHandler.java:940)
> at 
> org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:315)
> at 
> org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:229)
> at 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)
> at 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)
> at 
> org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:847)
> at 
> org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)
> at 
> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
> at 
> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
> at 
> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
> at 
> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
> at 
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
> at 
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:137)
> at java.lang.Thread.run(Thread.java:745)
> {code}
> Also when trying to use the Flink CLI, it get stuck on "Waiting for 
> response..." and there is no error messages on jobmanager. None of the 
> commands works, list, run etc.
>  
> Taskmanagers are able to registrate to Jobmanager, so the SSL configuration 
> is good.
>  
> SSL configuration:
> security.ssl.enabled: true
> security.ssl.keystore: /path/to/keystore
> security.ssl.keystore-password: 
> security.ssl.key-password: 
> security.ssl.truststore: /path/to/truststore
> security.ssl.truststore-password: 
> web.ssl.enabled: false
> This same con

[GitHub] flink issue #5928: [hotfix][doc] fix doc of externalized checkpoint

2018-05-01 Thread sihuazhou
Github user sihuazhou commented on the issue:

https://github.com/apache/flink/pull/5928
  
@StephanEwen @alpinegizmo Thanks for your comments, I will revert the 
changes of `config.md` and address the comments concerning to the ``` `file://` 
only for local setups```.


---


[jira] [Commented] (FLINK-9267) Classloading issues when using RemoteEnvironment ...

2018-05-01 Thread Chesnay Schepler (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9267?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16459784#comment-16459784
 ] 

Chesnay Schepler commented on FLINK-9267:
-

I setup a local 1.3.1 cluster, freshly built from source, no modifications to 
/lib or flink-conf.yaml.

I can successfully run the job after placing {{gradoop-demo-shaded.jar}} into 
the /lib folder.

I can also successfully submit jobs using the {{RemoteEnvironment}} after 
hard-coding the absolute path to the fat-jar ({{gradoop-demo-shaded.jar}}) in 
the {{RemoteEnvironment}} constructor call in 
{{Server#getExecutionEnvironment}}, at commit 
b7737c43364ebd0eb8724be64f1f53b60c845ffd.

As it stands I cannot reproduce the problem.

I used maven 3.5 and openjdk 1.8.0_162.

> Classloading issues when using RemoteEnvironment ...
> 
>
> Key: FLINK-9267
> URL: https://issues.apache.org/jira/browse/FLINK-9267
> Project: Flink
>  Issue Type: Bug
>  Components: Job-Submission
>Affects Versions: 1.4.2
> Environment: I have tried with Flink 1.3.3, Flink 1.4.2 and Flink 
> 1.6.0-SNAPSHOT.
> Oracle JDK 1.8.0_161 on Mac with a local cluster containing one JM and one TM.
>Reporter: Kedar Mhaswade
>Priority: Major
>
> See these two threads:
>  * [Nov 
> 2017|http://mail-archives.apache.org/mod_mbox/flink-user/201711.mbox/%3c66d706ad-5e47-4bd2-a7af-1db41cce7...@gmail.com%3E]
>  * [April 
> 2018|http://mail-archives.apache.org/mod_mbox/flink-user/201804.mbox/%3CCABzSAw_sQ149F8%2BS2Mpg%3DCH75F_7PuUx3hQYjdnLmOUL5O23oQ%40mail.gmail.com%3E]
> Both these threads show the classloading problems with using the 
> {{RemoteEnvironment}}. The instructions to [reproduce are 
> here|http://mail-archives.apache.org/mod_mbox/flink-user/201804.mbox/%3ccabzsaw_0qw_44xvjdtwz9y+s-pfgg2+cfx2c46gwqqngjj0...@mail.gmail.com%3E].
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-7789) Add handler for Async IO operator timeouts

2018-05-01 Thread tarun razdan (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7789?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16459804#comment-16459804
 ] 

tarun razdan commented on FLINK-7789:
-

Hi Team,

Can anyone suggest some alternate method to handle the timeout exception till 
the issue get resolved?

Cheers,

Tarun Razdan

> Add handler for Async IO operator timeouts 
> ---
>
> Key: FLINK-7789
> URL: https://issues.apache.org/jira/browse/FLINK-7789
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Reporter: Karthik Deivasigamani
>Priority: Major
>
> Currently Async IO operator does not provide a mechanism to handle timeouts. 
> When a request times out it an exception is thrown and job is restarted. It 
> would be good to pass a AsyncIOTimeoutHandler which can be implemented by the 
> user and passed in the constructor.
> Here is the discussion from apache flink users mailing list 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/async-io-operator-timeouts-tt16068.html



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9267) Classloading issues when using RemoteEnvironment ...

2018-05-01 Thread Kedar Mhaswade (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9267?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16459924#comment-16459924
 ] 

Kedar Mhaswade commented on FLINK-9267:
---

This is very strange! Thanks [~Zentol] for trying this out, but I have the 
following environment and it does not work for me:
 # Mac OS High Sierra 10.13.4
 # {{java version "1.8.0_161"}}
 # Install binary version of [Flink 
1.3.1|https://archive.apache.org/dist/flink/flink-1.3.1/flink-1.3.1-bin-hadoop2-scala_2.11.tgz].
 I believe this is the correct version. Let me know if I should use different.
 # Clone [https://github.com/kedarmhaswade/gradoop_demo.git]
 # cd gradoop_demo; git checkout -b dev
 # mvn clean package; \{{mvn -version}} prints:
{noformat}
Apache Maven 3.3.9 (bb52d8502b132ec0a5a3f4c09453c07478323dc5; 
2015-11-10T08:41:47-08:00)
Maven home: /usr/local/Cellar/maven/3.3.9/libexec
Java version: 1.8.0_161, vendor: Oracle Corporation
Java home: /Users/kedar/.sdkman/candidates/java/8.0.161-oracle/jre
Default locale: en_US, platform encoding: UTF-8
OS name: "mac os x", version: "10.13.4", arch: "x86_64", family: "mac"{noformat}

 # The head commit is: {{042e5f8 (HEAD -> dev, origin/dev) comment out the 
test; trying to reproduce Chesnay's environment for FLINK-9267}}.
 # Copy the built {{target/gradoop-demo-0.2.0.jar}} to {{/lib}}.
 # Start Flink Cluster: {{/bin/start-cluster.sh}}.
 # Start Gradoop demo server with the Flink cluster in 9: {{java -cp 
target/classes:target/gradoop-demo-shaded.jar org.gradoop.demo.server.Server 
--jmhost localhost --jmport 6123}}
 # Either use the browser to connect to 
{{http://localhost:2342/gradoop/html/cypher.html}} or do a simple curl query: 
{{curl -vL -X POST http://localhost:2342/keys/Example  | jq}}
 # The Gradoop demo server throws exception:

{noformat}
org.apache.flink.client.program.ProgramInvocationException: Could not start the 
ActorSystem needed to talk to the JobManager.
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:461)
at 
org.apache.flink.client.program.StandaloneClusterClient.submitJob(StandaloneClusterClient.java:105)
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:442)
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:429)
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:404)
at 
org.apache.flink.client.RemoteExecutor.executePlanWithJars(RemoteExecutor.java:211)
at org.apache.flink.client.RemoteExecutor.executePlan(RemoteExecutor.java:188)
at 
org.apache.flink.api.java.RemoteEnvironment.execute(RemoteEnvironment.java:172)
at 
org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:926)
at org.gradoop.demo.server.RequestHandler.getResponse(RequestHandler.java:470)
at 
org.gradoop.demo.server.RequestHandler.createResponse(RequestHandler.java:453)
at org.gradoop.demo.server.RequestHandler.executeCypher(RequestHandler.java:143)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
com.sun.jersey.spi.container.JavaMethodInvokerFactory$1.invoke(JavaMethodInvokerFactory.java:60)
at 
com.sun.jersey.server.impl.model.method.dispatch.AbstractResourceMethodDispatchProvider$ResponseOutInvoker._dispatch(AbstractResourceMethodDispatchProvider.java:205)
at 
com.sun.jersey.server.impl.model.method.dispatch.ResourceJavaMethodDispatcher.dispatch(ResourceJavaMethodDispatcher.java:75)
at 
com.sun.jersey.server.impl.uri.rules.HttpMethodRule.accept(HttpMethodRule.java:302)
at 
com.sun.jersey.server.impl.uri.rules.RightHandPathRule.accept(RightHandPathRule.java:147)
at 
com.sun.jersey.server.impl.uri.rules.ResourceClassRule.accept(ResourceClassRule.java:108)
at 
com.sun.jersey.server.impl.uri.rules.RightHandPathRule.accept(RightHandPathRule.java:147)
at 
com.sun.jersey.server.impl.uri.rules.RootResourceClassesRule.accept(RootResourceClassesRule.java:84)
at 
com.sun.jersey.server.impl.application.WebApplicationImpl._handleRequest(WebApplicationImpl.java:1542)
at 
com.sun.jersey.server.impl.application.WebApplicationImpl._handleRequest(WebApplicationImpl.java:1473)
at 
com.sun.jersey.server.impl.application.WebApplicationImpl.handleRequest(WebApplicationImpl.java:1419)
at 
com.sun.jersey.server.impl.application.WebApplicationImpl.handleRequest(WebApplicationImpl.java:1409)
at 
com.sun.jersey.server.impl.container.grizzly2.GrizzlyContainer._service(GrizzlyContainer.java:222)
at 
com.sun.jersey.server.impl.container.grizzly2.GrizzlyContainer.service(GrizzlyContainer.java:192)
at org.glassfish.grizzly.http.server.HttpHandler.doHandle(HttpHandler.java:164)
at 
org.glassfish.grizzly.http.server.HttpHandlerChain.service(HttpHandlerChain.java:196)
at org.glassfish.grizzly.ht

[jira] [Commented] (FLINK-9267) Classloading issues when using RemoteEnvironment ...

2018-05-01 Thread Kedar Mhaswade (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9267?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16460002#comment-16460002
 ] 

Kedar Mhaswade commented on FLINK-9267:
---

I also built Flink 1.3.1 from 
[source|https://archive.apache.org/dist/flink/flink-1.3.1/flink-1.3.1-src.tgz]. 
I still get the same result on the Gradoop server side! I really want to know 
what you did to be able to submit the Flink job from Gradoop server.

> Classloading issues when using RemoteEnvironment ...
> 
>
> Key: FLINK-9267
> URL: https://issues.apache.org/jira/browse/FLINK-9267
> Project: Flink
>  Issue Type: Bug
>  Components: Job-Submission
>Affects Versions: 1.4.2
> Environment: I have tried with Flink 1.3.3, Flink 1.4.2 and Flink 
> 1.6.0-SNAPSHOT.
> Oracle JDK 1.8.0_161 on Mac with a local cluster containing one JM and one TM.
>Reporter: Kedar Mhaswade
>Priority: Major
>
> See these two threads:
>  * [Nov 
> 2017|http://mail-archives.apache.org/mod_mbox/flink-user/201711.mbox/%3c66d706ad-5e47-4bd2-a7af-1db41cce7...@gmail.com%3E]
>  * [April 
> 2018|http://mail-archives.apache.org/mod_mbox/flink-user/201804.mbox/%3CCABzSAw_sQ149F8%2BS2Mpg%3DCH75F_7PuUx3hQYjdnLmOUL5O23oQ%40mail.gmail.com%3E]
> Both these threads show the classloading problems with using the 
> {{RemoteEnvironment}}. The instructions to [reproduce are 
> here|http://mail-archives.apache.org/mod_mbox/flink-user/201804.mbox/%3ccabzsaw_0qw_44xvjdtwz9y+s-pfgg2+cfx2c46gwqqngjj0...@mail.gmail.com%3E].
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9267) Classloading issues when using RemoteEnvironment ...

2018-05-01 Thread Chesnay Schepler (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9267?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16460019#comment-16460019
 ] 

Chesnay Schepler commented on FLINK-9267:
-

Please add the following to your {{maven-shade-plugin}} configuration:

{code}
  

  
reference.conf
  

  
{code}

Source: 
https://stackoverflow.com/questions/31011243/no-configuration-setting-found-for-key-akka-version

> Classloading issues when using RemoteEnvironment ...
> 
>
> Key: FLINK-9267
> URL: https://issues.apache.org/jira/browse/FLINK-9267
> Project: Flink
>  Issue Type: Bug
>  Components: Job-Submission
>Affects Versions: 1.4.2
> Environment: I have tried with Flink 1.3.3, Flink 1.4.2 and Flink 
> 1.6.0-SNAPSHOT.
> Oracle JDK 1.8.0_161 on Mac with a local cluster containing one JM and one TM.
>Reporter: Kedar Mhaswade
>Priority: Major
>
> See these two threads:
>  * [Nov 
> 2017|http://mail-archives.apache.org/mod_mbox/flink-user/201711.mbox/%3c66d706ad-5e47-4bd2-a7af-1db41cce7...@gmail.com%3E]
>  * [April 
> 2018|http://mail-archives.apache.org/mod_mbox/flink-user/201804.mbox/%3CCABzSAw_sQ149F8%2BS2Mpg%3DCH75F_7PuUx3hQYjdnLmOUL5O23oQ%40mail.gmail.com%3E]
> Both these threads show the classloading problems with using the 
> {{RemoteEnvironment}}. The instructions to [reproduce are 
> here|http://mail-archives.apache.org/mod_mbox/flink-user/201804.mbox/%3ccabzsaw_0qw_44xvjdtwz9y+s-pfgg2+cfx2c46gwqqngjj0...@mail.gmail.com%3E].
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9267) Classloading issues when using RemoteEnvironment ...

2018-05-01 Thread Kedar Mhaswade (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9267?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16460061#comment-16460061
 ] 

Kedar Mhaswade commented on FLINK-9267:
---

Wow! Thank you for the tip above. I have been able to talk to the cluster now. 
There are still serialization errors:
{noformat}
2018-05-01 12:30:17,598 ERROR Remoting  
- scala.Option; local class incompatible: stream classdesc 
serialVersionUID = -114498752079829388, local class serialVersionUID = 
-2062608324514658839
java.io.InvalidClassException: scala.Option; local class incompatible: stream 
classdesc serialVersionUID = -114498752079829388, local class serialVersionUID 
= -2062608324514658839
at java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:687)
at 
java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1876)
at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1745)
at 
java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1876)
at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1745)
at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2033)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1567)
at 
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2278)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2202)
at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2060)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1567)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:427)
at 
akka.serialization.JavaSerializer$$anonfun$1.apply(Serializer.scala:136)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
at akka.serialization.JavaSerializer.fromBinary(Serializer.scala:136)
at 
akka.serialization.Serialization$$anonfun$deserialize$1.apply(Serialization.scala:104)
at scala.util.Try$.apply(Try.scala:161)
at akka.serialization.Serialization.deserialize(Serialization.scala:98)
at 
akka.remote.serialization.MessageContainerSerializer.fromBinary(MessageContainerSerializer.scala:63)
at 
akka.serialization.Serialization$$anonfun$deserialize$1.apply(Serialization.scala:104)
at scala.util.Try$.apply(Try.scala:161)
at akka.serialization.Serialization.deserialize(Serialization.scala:98)
at 
akka.remote.MessageSerializer$.deserialize(MessageSerializer.scala:23)
at 
akka.remote.DefaultMessageDispatcher.payload$lzycompute$1(Endpoint.scala:58)
at akka.remote.DefaultMessageDispatcher.payload$1(Endpoint.scala:58)
at akka.remote.DefaultMessageDispatcher.dispatch(Endpoint.scala:76)
at 
akka.remote.EndpointReader$$anonfun$receive$2.applyOrElse(Endpoint.scala:967)
at akka.actor.Actor$class.aroundReceive(Actor.scala:467)
at akka.remote.EndpointActor.aroundReceive(Endpoint.scala:437)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
at akka.actor.ActorCell.invoke(ActorCell.scala:487)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
at akka.dispatch.Mailbox.run(Mailbox.scala:220)
at 
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at 
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at 
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
{noformat}
but this gives me something to work with. Thanks again. We would soon close 
this as "not a Flink  issue". Will do that.

> Classloading issues when using RemoteEnvironment ...
> 
>
> Key: FLINK-9267
> URL: https://issues.apache.org/jira/browse/FLINK-9267
> Project: Flink
>  Issue Type: Bug
>  Components: Job-Submission
>Affects Versions: 1.4.2
> Environment: I have tried with Flink 1.3.3, Flink 1.4.2 and Flink 
> 1.6.0-SNAPSHOT.
> Oracle JDK 1.8.0_161 on Mac with a local cluster containing one JM and one TM.
>Reporter: Kedar Mhaswade
>Priority: Major
>
> See these two threads:
>  * [Nov 
> 2017|http://mail-archives.apache.org/mod_mbox/flink-user/201711.mbox/%3c66d706ad-5e47-4bd2-a7af-1db41cce7...@gmail.com%3E]
>  * [April 
> 2018|http://mail-archives.apache.org/mod_mbox/flink-user/201804.mbox/%3CCABzSAw_sQ149F8%2BS2Mpg%3DCH75F_7PuUx3hQYjdnLmOUL5O23oQ%40mail.gmail.com%3E]
> Both these threads show the classloading problems with using the 
> {{RemoteEnvironment}}. The inst

[jira] [Created] (FLINK-9281) LogBack not working

2018-05-01 Thread Tim (JIRA)
Tim created FLINK-9281:
--

 Summary: LogBack not working
 Key: FLINK-9281
 URL: https://issues.apache.org/jira/browse/FLINK-9281
 Project: Flink
  Issue Type: Bug
  Components: Logging
Affects Versions: 1.4.2
Reporter: Tim


I am trying to get Flink to work with Logback instead of Log4J.   However, it 
is not working. 

My setup is as follows the advice on this page: 
https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/best_practices.html#use-logback-when-running-flink-on-a-cluster
 * Flink v1.4.2 running a stand-alone cluster. 
 * Started JobManager as a foreground process (bin/jobmanager.sh 
start-foreground cluster).  I updated bin/flink-console.sh to reference 
logback.xml via -Dlogback.configurationFile=file:/path/to/logfile.
 * Removed log4j jars under libs/  (log4j-1.2.xx.jar and sfl4j-log4j12-xxx.jar)
 * Added logback jars under libs/   (logback-classic, logback-core, 
log4j-over-slf4j.jar) 

However, I am not getting any file created.   Also, as a dumb test I referenced 
a non-existent logback.xml file (changed path to a non-existent folder) just to 
see if any errors appear on stdout, but nothing.

Thanks

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-9150) Prepare for Java 10

2018-05-01 Thread Ted Yu (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-9150?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ted Yu updated FLINK-9150:
--
Description: 
Java 9 is not a LTS release.

When compiling with Java 10, I see the following compilation error:

{code}
[ERROR] Failed to execute goal on project flink-shaded-hadoop2: Could not 
resolve dependencies for project 
org.apache.flink:flink-shaded-hadoop2:jar:1.6-SNAPSHOT: Could not find artifact 
jdk.tools:jdk.tools:jar:1.6 at specified path /a/jdk-10/../lib/tools.jar -> 
[Help 1]
{code}

  was:
Java 9 is not a LTS release.

When compiling with Java 10, I see the following compilation error:
{code}
[ERROR] Failed to execute goal on project flink-shaded-hadoop2: Could not 
resolve dependencies for project 
org.apache.flink:flink-shaded-hadoop2:jar:1.6-SNAPSHOT: Could not find artifact 
jdk.tools:jdk.tools:jar:1.6 at specified path /a/jdk-10/../lib/tools.jar -> 
[Help 1]
{code}


> Prepare for Java 10
> ---
>
> Key: FLINK-9150
> URL: https://issues.apache.org/jira/browse/FLINK-9150
> Project: Flink
>  Issue Type: Task
>Reporter: Ted Yu
>Priority: Major
>
> Java 9 is not a LTS release.
> When compiling with Java 10, I see the following compilation error:
> {code}
> [ERROR] Failed to execute goal on project flink-shaded-hadoop2: Could not 
> resolve dependencies for project 
> org.apache.flink:flink-shaded-hadoop2:jar:1.6-SNAPSHOT: Could not find 
> artifact jdk.tools:jdk.tools:jar:1.6 at specified path 
> /a/jdk-10/../lib/tools.jar -> [Help 1]
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-9048) LocalFlinkMiniClusterITCase#testLocalFlinkMiniClusterWithMultipleTaskManagers sometimes fails

2018-05-01 Thread Ted Yu (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-9048?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ted Yu updated FLINK-9048:
--
Description: 
As of commit e0bc37bef69f5376d03214578e9b95816add661b, I got the following :

{code}
testLocalFlinkMiniClusterWithMultipleTaskManagers(org.apache.flink.test.runtime.minicluster.LocalFlinkMiniClusterITCase)
  Time elapsed: 41.681 sec  <<< FAILURE!
java.lang.AssertionError: Thread 
Thread[ForkJoinPool.commonPool-worker-25,5,main] was started by the mini 
cluster, but not shut down
  at org.junit.Assert.fail(Assert.java:88)
  at 
org.apache.flink.test.runtime.minicluster.LocalFlinkMiniClusterITCase.testLocalFlinkMiniClusterWithMultipleTaskManagers(LocalFlinkMiniClusterITCase.java:174)
{code}

  was:
As of commit e0bc37bef69f5376d03214578e9b95816add661b, I got the following :
{code}
testLocalFlinkMiniClusterWithMultipleTaskManagers(org.apache.flink.test.runtime.minicluster.LocalFlinkMiniClusterITCase)
  Time elapsed: 41.681 sec  <<< FAILURE!
java.lang.AssertionError: Thread 
Thread[ForkJoinPool.commonPool-worker-25,5,main] was started by the mini 
cluster, but not shut down
  at org.junit.Assert.fail(Assert.java:88)
  at 
org.apache.flink.test.runtime.minicluster.LocalFlinkMiniClusterITCase.testLocalFlinkMiniClusterWithMultipleTaskManagers(LocalFlinkMiniClusterITCase.java:174)
{code}


> LocalFlinkMiniClusterITCase#testLocalFlinkMiniClusterWithMultipleTaskManagers 
> sometimes fails
> -
>
> Key: FLINK-9048
> URL: https://issues.apache.org/jira/browse/FLINK-9048
> Project: Flink
>  Issue Type: Test
>Reporter: Ted Yu
>Priority: Minor
>
> As of commit e0bc37bef69f5376d03214578e9b95816add661b, I got the following :
> {code}
> testLocalFlinkMiniClusterWithMultipleTaskManagers(org.apache.flink.test.runtime.minicluster.LocalFlinkMiniClusterITCase)
>   Time elapsed: 41.681 sec  <<< FAILURE!
> java.lang.AssertionError: Thread 
> Thread[ForkJoinPool.commonPool-worker-25,5,main] was started by the mini 
> cluster, but not shut down
>   at org.junit.Assert.fail(Assert.java:88)
>   at 
> org.apache.flink.test.runtime.minicluster.LocalFlinkMiniClusterITCase.testLocalFlinkMiniClusterWithMultipleTaskManagers(LocalFlinkMiniClusterITCase.java:174)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-8933) Avoid calling Class#newInstance

2018-05-01 Thread Ted Yu (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8933?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ted Yu updated FLINK-8933:
--
Description: 
Class#newInstance is deprecated starting in Java 9 - 
https://bugs.openjdk.java.net/browse/JDK-6850612 - because it may throw 
undeclared checked exceptions.

The suggested replacement is getDeclaredConstructor().newInstance(), which 
wraps the checked exceptions in InvocationException.

  was:
Class#newInstance is deprecated starting in Java 9 - 
https://bugs.openjdk.java.net/browse/JDK-6850612 - because it may throw 
undeclared checked exceptions.


The suggested replacement is getDeclaredConstructor().newInstance(), which 
wraps the checked exceptions in InvocationException.


> Avoid calling Class#newInstance
> ---
>
> Key: FLINK-8933
> URL: https://issues.apache.org/jira/browse/FLINK-8933
> Project: Flink
>  Issue Type: Task
>Reporter: Ted Yu
>Assignee: vinoyang
>Priority: Minor
>
> Class#newInstance is deprecated starting in Java 9 - 
> https://bugs.openjdk.java.net/browse/JDK-6850612 - because it may throw 
> undeclared checked exceptions.
> The suggested replacement is getDeclaredConstructor().newInstance(), which 
> wraps the checked exceptions in InvocationException.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #4607: [FLINK-6306][connectors] Sink for eventually consistent f...

2018-05-01 Thread stevenzwu
Github user stevenzwu commented on the issue:

https://github.com/apache/flink/pull/4607
  
@aljoscha is there any doc/write-up about the reworking of BucketingSink?


---


[jira] [Commented] (FLINK-6306) Sink for eventually consistent file systems

2018-05-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6306?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16460360#comment-16460360
 ] 

ASF GitHub Bot commented on FLINK-6306:
---

Github user stevenzwu commented on the issue:

https://github.com/apache/flink/pull/4607
  
@aljoscha is there any doc/write-up about the reworking of BucketingSink?


> Sink for eventually consistent file systems
> ---
>
> Key: FLINK-6306
> URL: https://issues.apache.org/jira/browse/FLINK-6306
> Project: Flink
>  Issue Type: New Feature
>  Components: filesystem-connector
>Reporter: Seth Wiesman
>Assignee: Seth Wiesman
>Priority: Major
> Attachments: eventually-consistent-sink
>
>
> Currently Flink provides the BucketingSink as an exactly once method for 
> writing out to a file system. It provides these guarantees by moving files 
> through several stages and deleting or truncating files that get into a bad 
> state. While this is a powerful abstraction, it causes issues with eventually 
> consistent file systems such as Amazon's S3 where most operations (ie rename, 
> delete, truncate) are not guaranteed to become consistent within a reasonable 
> amount of time. Flink should provide a sink that provides exactly once writes 
> to a file system where only PUT operations are considered consistent. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9273) Class cast exception

2018-05-01 Thread Bob Lau (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9273?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16460389#comment-16460389
 ] 

Bob Lau commented on FLINK-9273:


[~StephanEwen]  I want to transform DataStream into table environment, and 
I registerDataStream via scala language as follows.

and the message I received from the MQ is JSONString type.  the whole code like 
follows:

 
public static DataStream deserializationToRow(DataStream input, 
String[] fields, TypeInformation[] typeInfos, Boolean arrayFlag) {

 

DataStream out = input.flatMap(new FlatMapFunction() {

 

/**  */

privatestaticfinallongserialVersionUID = 1L;

 

@Override

public void flatMap(String input, Collector collector) {

Row row = null;

try {

 

Map map = JSON.parseObject(input, Map.class);

row = convertMapToRow(map, fields);

collector.collect(row);

} catch (JSONException e) {

List mapList = JSON.parseArray(input, Map.class);

if(mapList.size() > 0){

for(Map o : mapList){

row = convertMapToRow(o, fields);

{color:#FF}collector.collect(row); // The exception will happen here{color}

}

}

 

} catch (Exception e){

}

}

});

 

returnout;

}

 

private static Row convertMapToRow(Map map, String[] fields){

int colSize = fields.length;

Row row = new Row(colSize);

for(int i = 0; i < colSize; i++){

row.setField(i, map.get(fields[i]));

}

returnrow;

}

 

 

> Class cast exception
> 
>
> Key: FLINK-9273
> URL: https://issues.apache.org/jira/browse/FLINK-9273
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API, Streaming, Table API & SQL
>Affects Versions: 1.5.0
>Reporter: Bob Lau
>Priority: Major
>
> Exception stack is as follows:
> org.apache.flink.runtime.client.JobExecutionException: 
> java.lang.ClassCastException: java.lang.String cannot be cast to 
> java.lang.Long
> at 
> org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:621)
> at 
> org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:121)
> at 
> com..tysc.job.service.SubmitJobService.submitJobToLocal(SubmitJobService.java:385)
> at com..tysc.rest.JobSubmitController$3.run(JobSubmitController.java:114)
> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.ClassCastException: java.lang.String cannot be cast to 
> java.lang.Long
> at 
> org.apache.flink.api.common.typeutils.base.LongSerializer.copy(LongSerializer.java:27)
> at 
> org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:95)
> at 
> org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:46)
> at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:558)
> at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:535)
> at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:515)
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:679)
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:657)
> at 
> org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)
> at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:560)
> at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:535)
> at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:515)
> at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:630)
> at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:583)
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:679)
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:657)
> at 
> org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104)
> at 
> org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:111)
> at 
> org.apache.flink.streaming.connecto

[jira] [Comment Edited] (FLINK-9273) Class cast exception

2018-05-01 Thread Bob Lau (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9273?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16460389#comment-16460389
 ] 

Bob Lau edited comment on FLINK-9273 at 5/2/18 1:47 AM:


[~StephanEwen]  I want to transform DataStream into table environment, and 
I registerDataStream via scala language as follows.

and the message I received from the MQ is JSONString type.  the whole code like 
follows:

 
 public static DataStream deserializationToRow(DataStream input, 
String[] fields, TypeInformation[] typeInfos, Boolean arrayFlag) {

 

DataStream out = input.flatMap(new FlatMapFunction() {

 

/**  */

privatestaticfinallongserialVersionUID = 1L;

 

@Override

public void flatMap(String input, Collector collector) {

Row row = null;

try

{   Map map = JSON.parseObject(input, Map.class);

row = convertMapToRow(map, fields);

{color:#FF}collector.collect(row);  // The exception will happen here{color}

}

catch (JSONException e) {

List mapList = JSON.parseArray(input, Map.class);

if(mapList.size() > 0){

for(Map o : mapList){

row = convertMapToRow(o, fields);

{color:#ff}collector.collect(row); // The exception will happen here{color}

}

}

 

} catch (Exception e){

}

}

});

 

returnout;

}

 

private static Row convertMapToRow(Map map, String[] fields){

int colSize = fields.length;

Row row = new Row(colSize);

for(int i = 0; i < colSize; i++)

{ row.setField(i, map.get(fields[i])); }

returnrow;

}

 

 


was (Author: bob365):
[~StephanEwen]  I want to transform DataStream into table environment, and 
I registerDataStream via scala language as follows.

and the message I received from the MQ is JSONString type.  the whole code like 
follows:

 
public static DataStream deserializationToRow(DataStream input, 
String[] fields, TypeInformation[] typeInfos, Boolean arrayFlag) {

 

DataStream out = input.flatMap(new FlatMapFunction() {

 

/**  */

privatestaticfinallongserialVersionUID = 1L;

 

@Override

public void flatMap(String input, Collector collector) {

Row row = null;

try {

 

Map map = JSON.parseObject(input, Map.class);

row = convertMapToRow(map, fields);

collector.collect(row);

} catch (JSONException e) {

List mapList = JSON.parseArray(input, Map.class);

if(mapList.size() > 0){

for(Map o : mapList){

row = convertMapToRow(o, fields);

{color:#FF}collector.collect(row); // The exception will happen here{color}

}

}

 

} catch (Exception e){

}

}

});

 

returnout;

}

 

private static Row convertMapToRow(Map map, String[] fields){

int colSize = fields.length;

Row row = new Row(colSize);

for(int i = 0; i < colSize; i++){

row.setField(i, map.get(fields[i]));

}

returnrow;

}

 

 

> Class cast exception
> 
>
> Key: FLINK-9273
> URL: https://issues.apache.org/jira/browse/FLINK-9273
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API, Streaming, Table API & SQL
>Affects Versions: 1.5.0
>Reporter: Bob Lau
>Priority: Major
>
> Exception stack is as follows:
> org.apache.flink.runtime.client.JobExecutionException: 
> java.lang.ClassCastException: java.lang.String cannot be cast to 
> java.lang.Long
> at 
> org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:621)
> at 
> org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:121)
> at 
> com..tysc.job.service.SubmitJobService.submitJobToLocal(SubmitJobService.java:385)
> at com..tysc.rest.JobSubmitController$3.run(JobSubmitController.java:114)
> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.ClassCastException: java.lang.String cannot be cast to 
> java.lang.Long
> at 
> org.apache.flink.api.common.typeutils.base.LongSerializer.copy(LongSerializer.java:27)
> at 
> org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:95)
> at 
> org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:46)
> at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:558)
> at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:535)
> at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:515)
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:679)
> at 
> org.apache.flink.streaming.api.operators.AbstractSt

[jira] [Comment Edited] (FLINK-9273) Class cast exception

2018-05-01 Thread Bob Lau (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9273?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16460389#comment-16460389
 ] 

Bob Lau edited comment on FLINK-9273 at 5/2/18 1:49 AM:


[~StephanEwen]  I want to transform DataStream into table environment, and 
I registerDataStream via scala language as follows.

and the message I received from the MQ is JSONString type.  the whole code like 
follows:

 

def registerTableFromRowDataStream (

                                      tableName: String,

                                      inputDataStream: DataStream[Row],

                                      tEnv: StreamTableEnvironment,

                                      fields: Array[String],

                                      typeInfos: Array[TypeInformation[_]]

                                    ) : Unit = {

    implicit val tpe: TypeInformation[Row] = new RowTypeInfo(typeInfos, fields) 
 

    val cRowType = CRowTypeInfo(tpe)

     val newDataStream =  inputDataStream.asInstanceOf[DataStream[Row]].map(new 
RichMapFunction[Row, CRow] {

        @transient private var outCRow: CRow = null

        override def open(parameters: Configuration): Unit = {

          outCRow = new CRow(null, change = true)

        }

 

        override def map(v: Row): CRow = {

          outCRow.row = v

          outCRow

        }

      }).returns(cRowType)

    tEnv.registerDataStream(tableName, newDataStream)

  }

 
 public static DataStream deserializationToRow(DataStream input, 
String[] fields, TypeInformation[] typeInfos, Boolean arrayFlag) {

 

DataStream out = input.flatMap(new FlatMapFunction() {

 

/**  */

privatestaticfinallongserialVersionUID = 1L;

 

@Override

public void flatMap(String input, Collector collector) {

Row row = null;

try

{   Map map = JSON.parseObject(input, Map.class);

row = convertMapToRow(map, fields);

{color:#ff}collector.collect(row);  // The exception will happen here{color}

}

catch (JSONException e) {

List mapList = JSON.parseArray(input, Map.class);

if(mapList.size() > 0){

for(Map o : mapList){

row = convertMapToRow(o, fields);

{color:#ff}collector.collect(row); // The exception will happen here{color}

}

}

 

} catch (Exception e){

}

}

});

 

returnout;

}

 

private static Row convertMapToRow(Map map, String[] fields){

int colSize = fields.length;

Row row = new Row(colSize);

for(int i = 0; i < colSize; i++)

{ row.setField(i, map.get(fields[i])); }

returnrow;

}

 

 


was (Author: bob365):
[~StephanEwen]  I want to transform DataStream into table environment, and 
I registerDataStream via scala language as follows.

and the message I received from the MQ is JSONString type.  the whole code like 
follows:

 
 public static DataStream deserializationToRow(DataStream input, 
String[] fields, TypeInformation[] typeInfos, Boolean arrayFlag) {

 

DataStream out = input.flatMap(new FlatMapFunction() {

 

/**  */

privatestaticfinallongserialVersionUID = 1L;

 

@Override

public void flatMap(String input, Collector collector) {

Row row = null;

try

{   Map map = JSON.parseObject(input, Map.class);

row = convertMapToRow(map, fields);

{color:#FF}collector.collect(row);  // The exception will happen here{color}

}

catch (JSONException e) {

List mapList = JSON.parseArray(input, Map.class);

if(mapList.size() > 0){

for(Map o : mapList){

row = convertMapToRow(o, fields);

{color:#ff}collector.collect(row); // The exception will happen here{color}

}

}

 

} catch (Exception e){

}

}

});

 

returnout;

}

 

private static Row convertMapToRow(Map map, String[] fields){

int colSize = fields.length;

Row row = new Row(colSize);

for(int i = 0; i < colSize; i++)

{ row.setField(i, map.get(fields[i])); }

returnrow;

}

 

 

> Class cast exception
> 
>
> Key: FLINK-9273
> URL: https://issues.apache.org/jira/browse/FLINK-9273
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API, Streaming, Table API & SQL
>Affects Versions: 1.5.0
>Reporter: Bob Lau
>Priority: Major
>
> Exception stack is as follows:
> org.apache.flink.runtime.client.JobExecutionException: 
> java.lang.ClassCastException: java.lang.String cannot be cast to 
> java.lang.Long
> at 
> org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:621)
> at 
> org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:121)
> at 
> com..tysc.job.service.SubmitJobService.submitJobToLocal(SubmitJobService.java:385)
> at com..tysc.rest.JobSubmitController$3.run(JobSubmitController.java:114)
> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at 
> java.util.concurrent.ThreadPoolExecutor.runW

[jira] [Commented] (FLINK-9273) Class cast exception

2018-05-01 Thread Bob Lau (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9273?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16460407#comment-16460407
 ] 

Bob Lau commented on FLINK-9273:


[~StephanEwen] Good morning! Thanks for your response!  I want to know the 
reason is it or not, the `Collector collector` cannot collect the ` Row ` 
object correctly because of the raw type ` Row` ?

> Class cast exception
> 
>
> Key: FLINK-9273
> URL: https://issues.apache.org/jira/browse/FLINK-9273
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API, Streaming, Table API & SQL
>Affects Versions: 1.5.0
>Reporter: Bob Lau
>Priority: Major
>
> Exception stack is as follows:
> org.apache.flink.runtime.client.JobExecutionException: 
> java.lang.ClassCastException: java.lang.String cannot be cast to 
> java.lang.Long
> at 
> org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:621)
> at 
> org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:121)
> at 
> com..tysc.job.service.SubmitJobService.submitJobToLocal(SubmitJobService.java:385)
> at com..tysc.rest.JobSubmitController$3.run(JobSubmitController.java:114)
> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.ClassCastException: java.lang.String cannot be cast to 
> java.lang.Long
> at 
> org.apache.flink.api.common.typeutils.base.LongSerializer.copy(LongSerializer.java:27)
> at 
> org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:95)
> at 
> org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:46)
> at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:558)
> at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:535)
> at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:515)
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:679)
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:657)
> at 
> org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)
> at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:560)
> at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:535)
> at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:515)
> at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:630)
> at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:583)
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:679)
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:657)
> at 
> org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104)
> at 
> org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:111)
> at 
> org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordWithTimestamp(AbstractFetcher.java:396)
> at 
> org.apache.flink.streaming.connectors.kafka.internal.Kafka010Fetcher.emitRecord(Kafka010Fetcher.java:89)
> at 
> org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.runFetchLoop(Kafka09Fetcher.java:154)
> at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:738)
> at 
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:87)
> at 
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:56)
> at 
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:99)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:307)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
> ... 1 more



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-9269) Concurrency problem in HeapKeyedStateBackend when performing checkpoint async

2018-05-01 Thread Sihua Zhou (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-9269?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sihua Zhou updated FLINK-9269:
--
Priority: Critical  (was: Major)

> Concurrency problem in HeapKeyedStateBackend when performing checkpoint async
> -
>
> Key: FLINK-9269
> URL: https://issues.apache.org/jira/browse/FLINK-9269
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.5.0
>Reporter: Sihua Zhou
>Assignee: Sihua Zhou
>Priority: Critical
> Fix For: 1.5.0
>
>
> {code:java}
> @Test
> public void testConccurrencyProblem() throws Exception {
>   CheckpointStreamFactory streamFactory = createStreamFactory();
>   Environment env = new DummyEnvironment();
>   AbstractKeyedStateBackend backend = 
> createKeyedBackend(IntSerializer.INSTANCE, env);
>   try {
>   long checkpointID = 0;
>   List futureList = new ArrayList();
>   for (int i = 0; i < 10; ++i) {
>   ValueStateDescriptor kvId = new 
> ValueStateDescriptor<>("id" + i, IntSerializer.INSTANCE);
>   ValueState state = 
> backend.getOrCreateKeyedState(VoidNamespaceSerializer.INSTANCE, kvId);
>   ((InternalValueState) 
> state).setCurrentNamespace(VoidNamespace.INSTANCE);
>   backend.setCurrentKey(i);
>   state.update(i);
>   
> futureList.add(runSnapshotAsync(backend.snapshot(checkpointID++, 
> System.currentTimeMillis(), streamFactory, 
> CheckpointOptions.forCheckpointWithDefaultLocation(;
>   }
>   for (Future future : futureList) {
>   future.get();
>   }
>   } catch (Exception e) {
>   fail();
>   } finally {
>   backend.dispose();
>   }
> }
> protected Future runSnapshotAsync(
>   RunnableFuture> 
> snapshotRunnableFuture) throws Exception {
>   if (!snapshotRunnableFuture.isDone()) {
>   return Executors.newFixedThreadPool(5).submit(() -> {
>   try {
>   snapshotRunnableFuture.run();
>   snapshotRunnableFuture.get();
>   } catch (Exception e) {
>   e.printStackTrace();
>   fail();
>   }
>   });
>   }
>   return null;
> }
> {code}
> Place the above code in `StateBackendTestBase` and run 
> `AsyncMemoryStateBackendTest`, it will get the follows exception
> {code}
> java.util.concurrent.ExecutionException: java.lang.NullPointerException
>   at java.util.concurrent.FutureTask.report(FutureTask.java:122)
>   at java.util.concurrent.FutureTask.get(FutureTask.java:192)
>   at 
> org.apache.flink.runtime.state.AsyncMemoryStateBackendTest.lambda$runSnapshotAsync$0(AsyncMemoryStateBackendTest.java:85)
>   at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.NullPointerException
>   at 
> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend$HeapSnapshotStrategy$1.performOperation(HeapKeyedStateBackend.java:716)
>   at 
> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend$HeapSnapshotStrategy$1.performOperation(HeapKeyedStateBackend.java:662)
>   at 
> org.apache.flink.runtime.io.async.AbstractAsyncCallableWithResources.call(AbstractAsyncCallableWithResources.java:75)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>   at 
> org.apache.flink.runtime.state.AsyncMemoryStateBackendTest.lambda$runSnapshotAsync$0(AsyncMemoryStateBackendTest.java:84)
>   ... 5 more
> java.util.concurrent.ExecutionException: java.lang.NullPointerException
>   at java.util.concurrent.FutureTask.report(FutureTask.java:122)
>   at java.util.concurrent.FutureTask.get(FutureTask.java:192)
>   at 
> org.apache.flink.runtime.state.AsyncMemoryStateBackendTest.lambda$runSnapshotAsync$0(AsyncMemoryStateBackendTest.java:85)
>   at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>   at java.lang.Thread.run(Thread.java:745