[jira] [Updated] (FLINK-9269) Concurrency problem in HeapKeyedStateBackend when performing checkpoint async
[ https://issues.apache.org/jira/browse/FLINK-9269?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sihua Zhou updated FLINK-9269: -- Priority: Critical (was: Major) > Concurrency problem in HeapKeyedStateBackend when performing checkpoint async > - > > Key: FLINK-9269 > URL: https://issues.apache.org/jira/browse/FLINK-9269 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Affects Versions: 1.5.0 >Reporter: Sihua Zhou >Assignee: Sihua Zhou >Priority: Critical > Fix For: 1.5.0 > > > {code:java} > @Test > public void testConccurrencyProblem() throws Exception { > CheckpointStreamFactory streamFactory = createStreamFactory(); > Environment env = new DummyEnvironment(); > AbstractKeyedStateBackend backend = > createKeyedBackend(IntSerializer.INSTANCE, env); > try { > long checkpointID = 0; > List futureList = new ArrayList(); > for (int i = 0; i < 10; ++i) { > ValueStateDescriptor kvId = new > ValueStateDescriptor<>("id" + i, IntSerializer.INSTANCE); > ValueState state = > backend.getOrCreateKeyedState(VoidNamespaceSerializer.INSTANCE, kvId); > ((InternalValueState) > state).setCurrentNamespace(VoidNamespace.INSTANCE); > backend.setCurrentKey(i); > state.update(i); > > futureList.add(runSnapshotAsync(backend.snapshot(checkpointID++, > System.currentTimeMillis(), streamFactory, > CheckpointOptions.forCheckpointWithDefaultLocation(; > } > for (Future future : futureList) { > future.get(); > } > } catch (Exception e) { > fail(); > } finally { > backend.dispose(); > } > } > protected Future runSnapshotAsync( > RunnableFuture> > snapshotRunnableFuture) throws Exception { > if (!snapshotRunnableFuture.isDone()) { > return Executors.newFixedThreadPool(5).submit(() -> { > try { > snapshotRunnableFuture.run(); > snapshotRunnableFuture.get(); > } catch (Exception e) { > e.printStackTrace(); > fail(); > } > }); > } > return null; > } > {code} > Place the above code in `StateBackendTestBase` and run > `AsyncMemoryStateBackendTest`, it will get the follows exception > {code} > java.util.concurrent.ExecutionException: java.lang.NullPointerException > at java.util.concurrent.FutureTask.report(FutureTask.java:122) > at java.util.concurrent.FutureTask.get(FutureTask.java:192) > at > org.apache.flink.runtime.state.AsyncMemoryStateBackendTest.lambda$runSnapshotAsync$0(AsyncMemoryStateBackendTest.java:85) > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > Caused by: java.lang.NullPointerException > at > org.apache.flink.runtime.state.heap.HeapKeyedStateBackend$HeapSnapshotStrategy$1.performOperation(HeapKeyedStateBackend.java:716) > at > org.apache.flink.runtime.state.heap.HeapKeyedStateBackend$HeapSnapshotStrategy$1.performOperation(HeapKeyedStateBackend.java:662) > at > org.apache.flink.runtime.io.async.AbstractAsyncCallableWithResources.call(AbstractAsyncCallableWithResources.java:75) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > org.apache.flink.runtime.state.AsyncMemoryStateBackendTest.lambda$runSnapshotAsync$0(AsyncMemoryStateBackendTest.java:84) > ... 5 more > java.util.concurrent.ExecutionException: java.lang.NullPointerException > at java.util.concurrent.FutureTask.report(FutureTask.java:122) > at java.util.concurrent.FutureTask.get(FutureTask.java:192) > at > org.apache.flink.runtime.state.AsyncMemoryStateBackendTest.lambda$runSnapshotAsync$0(AsyncMemoryStateBackendTest.java:85) > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745
[jira] [Commented] (FLINK-9273) Class cast exception
[ https://issues.apache.org/jira/browse/FLINK-9273?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16460407#comment-16460407 ] Bob Lau commented on FLINK-9273: [~StephanEwen] Good morning! Thanks for your response! I want to know the reason is it or not, the `Collector collector` cannot collect the ` Row ` object correctly because of the raw type ` Row` ? > Class cast exception > > > Key: FLINK-9273 > URL: https://issues.apache.org/jira/browse/FLINK-9273 > Project: Flink > Issue Type: Bug > Components: DataStream API, Streaming, Table API & SQL >Affects Versions: 1.5.0 >Reporter: Bob Lau >Priority: Major > > Exception stack is as follows: > org.apache.flink.runtime.client.JobExecutionException: > java.lang.ClassCastException: java.lang.String cannot be cast to > java.lang.Long > at > org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:621) > at > org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:121) > at > com..tysc.job.service.SubmitJobService.submitJobToLocal(SubmitJobService.java:385) > at com..tysc.rest.JobSubmitController$3.run(JobSubmitController.java:114) > at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > Caused by: java.lang.ClassCastException: java.lang.String cannot be cast to > java.lang.Long > at > org.apache.flink.api.common.typeutils.base.LongSerializer.copy(LongSerializer.java:27) > at > org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:95) > at > org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:46) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:558) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:535) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:515) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:679) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:657) > at > org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:560) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:535) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:515) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:630) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:583) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:679) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:657) > at > org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104) > at > org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:111) > at > org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordWithTimestamp(AbstractFetcher.java:396) > at > org.apache.flink.streaming.connectors.kafka.internal.Kafka010Fetcher.emitRecord(Kafka010Fetcher.java:89) > at > org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.runFetchLoop(Kafka09Fetcher.java:154) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:738) > at > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:87) > at > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:56) > at > org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:99) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:307) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702) > ... 1 more -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (FLINK-9273) Class cast exception
[ https://issues.apache.org/jira/browse/FLINK-9273?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16460389#comment-16460389 ] Bob Lau edited comment on FLINK-9273 at 5/2/18 1:49 AM: [~StephanEwen] I want to transform DataStream into table environment, and I registerDataStream via scala language as follows. and the message I received from the MQ is JSONString type. the whole code like follows: def registerTableFromRowDataStream ( tableName: String, inputDataStream: DataStream[Row], tEnv: StreamTableEnvironment, fields: Array[String], typeInfos: Array[TypeInformation[_]] ) : Unit = { implicit val tpe: TypeInformation[Row] = new RowTypeInfo(typeInfos, fields) val cRowType = CRowTypeInfo(tpe) val newDataStream = inputDataStream.asInstanceOf[DataStream[Row]].map(new RichMapFunction[Row, CRow] { @transient private var outCRow: CRow = null override def open(parameters: Configuration): Unit = { outCRow = new CRow(null, change = true) } override def map(v: Row): CRow = { outCRow.row = v outCRow } }).returns(cRowType) tEnv.registerDataStream(tableName, newDataStream) } public static DataStream deserializationToRow(DataStream input, String[] fields, TypeInformation[] typeInfos, Boolean arrayFlag) { DataStream out = input.flatMap(new FlatMapFunction() { /** */ privatestaticfinallongserialVersionUID = 1L; @Override public void flatMap(String input, Collector collector) { Row row = null; try { Map map = JSON.parseObject(input, Map.class); row = convertMapToRow(map, fields); {color:#ff}collector.collect(row); // The exception will happen here{color} } catch (JSONException e) { List mapList = JSON.parseArray(input, Map.class); if(mapList.size() > 0){ for(Map o : mapList){ row = convertMapToRow(o, fields); {color:#ff}collector.collect(row); // The exception will happen here{color} } } } catch (Exception e){ } } }); returnout; } private static Row convertMapToRow(Map map, String[] fields){ int colSize = fields.length; Row row = new Row(colSize); for(int i = 0; i < colSize; i++) { row.setField(i, map.get(fields[i])); } returnrow; } was (Author: bob365): [~StephanEwen] I want to transform DataStream into table environment, and I registerDataStream via scala language as follows. and the message I received from the MQ is JSONString type. the whole code like follows: public static DataStream deserializationToRow(DataStream input, String[] fields, TypeInformation[] typeInfos, Boolean arrayFlag) { DataStream out = input.flatMap(new FlatMapFunction() { /** */ privatestaticfinallongserialVersionUID = 1L; @Override public void flatMap(String input, Collector collector) { Row row = null; try { Map map = JSON.parseObject(input, Map.class); row = convertMapToRow(map, fields); {color:#FF}collector.collect(row); // The exception will happen here{color} } catch (JSONException e) { List mapList = JSON.parseArray(input, Map.class); if(mapList.size() > 0){ for(Map o : mapList){ row = convertMapToRow(o, fields); {color:#ff}collector.collect(row); // The exception will happen here{color} } } } catch (Exception e){ } } }); returnout; } private static Row convertMapToRow(Map map, String[] fields){ int colSize = fields.length; Row row = new Row(colSize); for(int i = 0; i < colSize; i++) { row.setField(i, map.get(fields[i])); } returnrow; } > Class cast exception > > > Key: FLINK-9273 > URL: https://issues.apache.org/jira/browse/FLINK-9273 > Project: Flink > Issue Type: Bug > Components: DataStream API, Streaming, Table API & SQL >Affects Versions: 1.5.0 >Reporter: Bob Lau >Priority: Major > > Exception stack is as follows: > org.apache.flink.runtime.client.JobExecutionException: > java.lang.ClassCastException: java.lang.String cannot be cast to > java.lang.Long > at > org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:621) > at > org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:121) > at > com..tysc.job.service.SubmitJobService.submitJobToLocal(SubmitJobService.java:385) > at com..tysc.rest.JobSubmitController$3.run(JobSubmitController.java:114) > at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runW
[jira] [Comment Edited] (FLINK-9273) Class cast exception
[ https://issues.apache.org/jira/browse/FLINK-9273?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16460389#comment-16460389 ] Bob Lau edited comment on FLINK-9273 at 5/2/18 1:47 AM: [~StephanEwen] I want to transform DataStream into table environment, and I registerDataStream via scala language as follows. and the message I received from the MQ is JSONString type. the whole code like follows: public static DataStream deserializationToRow(DataStream input, String[] fields, TypeInformation[] typeInfos, Boolean arrayFlag) { DataStream out = input.flatMap(new FlatMapFunction() { /** */ privatestaticfinallongserialVersionUID = 1L; @Override public void flatMap(String input, Collector collector) { Row row = null; try { Map map = JSON.parseObject(input, Map.class); row = convertMapToRow(map, fields); {color:#FF}collector.collect(row); // The exception will happen here{color} } catch (JSONException e) { List mapList = JSON.parseArray(input, Map.class); if(mapList.size() > 0){ for(Map o : mapList){ row = convertMapToRow(o, fields); {color:#ff}collector.collect(row); // The exception will happen here{color} } } } catch (Exception e){ } } }); returnout; } private static Row convertMapToRow(Map map, String[] fields){ int colSize = fields.length; Row row = new Row(colSize); for(int i = 0; i < colSize; i++) { row.setField(i, map.get(fields[i])); } returnrow; } was (Author: bob365): [~StephanEwen] I want to transform DataStream into table environment, and I registerDataStream via scala language as follows. and the message I received from the MQ is JSONString type. the whole code like follows: public static DataStream deserializationToRow(DataStream input, String[] fields, TypeInformation[] typeInfos, Boolean arrayFlag) { DataStream out = input.flatMap(new FlatMapFunction() { /** */ privatestaticfinallongserialVersionUID = 1L; @Override public void flatMap(String input, Collector collector) { Row row = null; try { Map map = JSON.parseObject(input, Map.class); row = convertMapToRow(map, fields); collector.collect(row); } catch (JSONException e) { List mapList = JSON.parseArray(input, Map.class); if(mapList.size() > 0){ for(Map o : mapList){ row = convertMapToRow(o, fields); {color:#FF}collector.collect(row); // The exception will happen here{color} } } } catch (Exception e){ } } }); returnout; } private static Row convertMapToRow(Map map, String[] fields){ int colSize = fields.length; Row row = new Row(colSize); for(int i = 0; i < colSize; i++){ row.setField(i, map.get(fields[i])); } returnrow; } > Class cast exception > > > Key: FLINK-9273 > URL: https://issues.apache.org/jira/browse/FLINK-9273 > Project: Flink > Issue Type: Bug > Components: DataStream API, Streaming, Table API & SQL >Affects Versions: 1.5.0 >Reporter: Bob Lau >Priority: Major > > Exception stack is as follows: > org.apache.flink.runtime.client.JobExecutionException: > java.lang.ClassCastException: java.lang.String cannot be cast to > java.lang.Long > at > org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:621) > at > org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:121) > at > com..tysc.job.service.SubmitJobService.submitJobToLocal(SubmitJobService.java:385) > at com..tysc.rest.JobSubmitController$3.run(JobSubmitController.java:114) > at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > Caused by: java.lang.ClassCastException: java.lang.String cannot be cast to > java.lang.Long > at > org.apache.flink.api.common.typeutils.base.LongSerializer.copy(LongSerializer.java:27) > at > org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:95) > at > org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:46) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:558) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:535) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:515) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:679) > at > org.apache.flink.streaming.api.operators.AbstractSt
[jira] [Commented] (FLINK-9273) Class cast exception
[ https://issues.apache.org/jira/browse/FLINK-9273?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16460389#comment-16460389 ] Bob Lau commented on FLINK-9273: [~StephanEwen] I want to transform DataStream into table environment, and I registerDataStream via scala language as follows. and the message I received from the MQ is JSONString type. the whole code like follows: public static DataStream deserializationToRow(DataStream input, String[] fields, TypeInformation[] typeInfos, Boolean arrayFlag) { DataStream out = input.flatMap(new FlatMapFunction() { /** */ privatestaticfinallongserialVersionUID = 1L; @Override public void flatMap(String input, Collector collector) { Row row = null; try { Map map = JSON.parseObject(input, Map.class); row = convertMapToRow(map, fields); collector.collect(row); } catch (JSONException e) { List mapList = JSON.parseArray(input, Map.class); if(mapList.size() > 0){ for(Map o : mapList){ row = convertMapToRow(o, fields); {color:#FF}collector.collect(row); // The exception will happen here{color} } } } catch (Exception e){ } } }); returnout; } private static Row convertMapToRow(Map map, String[] fields){ int colSize = fields.length; Row row = new Row(colSize); for(int i = 0; i < colSize; i++){ row.setField(i, map.get(fields[i])); } returnrow; } > Class cast exception > > > Key: FLINK-9273 > URL: https://issues.apache.org/jira/browse/FLINK-9273 > Project: Flink > Issue Type: Bug > Components: DataStream API, Streaming, Table API & SQL >Affects Versions: 1.5.0 >Reporter: Bob Lau >Priority: Major > > Exception stack is as follows: > org.apache.flink.runtime.client.JobExecutionException: > java.lang.ClassCastException: java.lang.String cannot be cast to > java.lang.Long > at > org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:621) > at > org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:121) > at > com..tysc.job.service.SubmitJobService.submitJobToLocal(SubmitJobService.java:385) > at com..tysc.rest.JobSubmitController$3.run(JobSubmitController.java:114) > at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > Caused by: java.lang.ClassCastException: java.lang.String cannot be cast to > java.lang.Long > at > org.apache.flink.api.common.typeutils.base.LongSerializer.copy(LongSerializer.java:27) > at > org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:95) > at > org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:46) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:558) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:535) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:515) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:679) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:657) > at > org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:560) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:535) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:515) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:630) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:583) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:679) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:657) > at > org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104) > at > org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:111) > at > org.apache.flink.streaming.connecto
[jira] [Commented] (FLINK-6306) Sink for eventually consistent file systems
[ https://issues.apache.org/jira/browse/FLINK-6306?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16460360#comment-16460360 ] ASF GitHub Bot commented on FLINK-6306: --- Github user stevenzwu commented on the issue: https://github.com/apache/flink/pull/4607 @aljoscha is there any doc/write-up about the reworking of BucketingSink? > Sink for eventually consistent file systems > --- > > Key: FLINK-6306 > URL: https://issues.apache.org/jira/browse/FLINK-6306 > Project: Flink > Issue Type: New Feature > Components: filesystem-connector >Reporter: Seth Wiesman >Assignee: Seth Wiesman >Priority: Major > Attachments: eventually-consistent-sink > > > Currently Flink provides the BucketingSink as an exactly once method for > writing out to a file system. It provides these guarantees by moving files > through several stages and deleting or truncating files that get into a bad > state. While this is a powerful abstraction, it causes issues with eventually > consistent file systems such as Amazon's S3 where most operations (ie rename, > delete, truncate) are not guaranteed to become consistent within a reasonable > amount of time. Flink should provide a sink that provides exactly once writes > to a file system where only PUT operations are considered consistent. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #4607: [FLINK-6306][connectors] Sink for eventually consistent f...
Github user stevenzwu commented on the issue: https://github.com/apache/flink/pull/4607 @aljoscha is there any doc/write-up about the reworking of BucketingSink? ---
[jira] [Updated] (FLINK-8933) Avoid calling Class#newInstance
[ https://issues.apache.org/jira/browse/FLINK-8933?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ted Yu updated FLINK-8933: -- Description: Class#newInstance is deprecated starting in Java 9 - https://bugs.openjdk.java.net/browse/JDK-6850612 - because it may throw undeclared checked exceptions. The suggested replacement is getDeclaredConstructor().newInstance(), which wraps the checked exceptions in InvocationException. was: Class#newInstance is deprecated starting in Java 9 - https://bugs.openjdk.java.net/browse/JDK-6850612 - because it may throw undeclared checked exceptions. The suggested replacement is getDeclaredConstructor().newInstance(), which wraps the checked exceptions in InvocationException. > Avoid calling Class#newInstance > --- > > Key: FLINK-8933 > URL: https://issues.apache.org/jira/browse/FLINK-8933 > Project: Flink > Issue Type: Task >Reporter: Ted Yu >Assignee: vinoyang >Priority: Minor > > Class#newInstance is deprecated starting in Java 9 - > https://bugs.openjdk.java.net/browse/JDK-6850612 - because it may throw > undeclared checked exceptions. > The suggested replacement is getDeclaredConstructor().newInstance(), which > wraps the checked exceptions in InvocationException. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-9048) LocalFlinkMiniClusterITCase#testLocalFlinkMiniClusterWithMultipleTaskManagers sometimes fails
[ https://issues.apache.org/jira/browse/FLINK-9048?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ted Yu updated FLINK-9048: -- Description: As of commit e0bc37bef69f5376d03214578e9b95816add661b, I got the following : {code} testLocalFlinkMiniClusterWithMultipleTaskManagers(org.apache.flink.test.runtime.minicluster.LocalFlinkMiniClusterITCase) Time elapsed: 41.681 sec <<< FAILURE! java.lang.AssertionError: Thread Thread[ForkJoinPool.commonPool-worker-25,5,main] was started by the mini cluster, but not shut down at org.junit.Assert.fail(Assert.java:88) at org.apache.flink.test.runtime.minicluster.LocalFlinkMiniClusterITCase.testLocalFlinkMiniClusterWithMultipleTaskManagers(LocalFlinkMiniClusterITCase.java:174) {code} was: As of commit e0bc37bef69f5376d03214578e9b95816add661b, I got the following : {code} testLocalFlinkMiniClusterWithMultipleTaskManagers(org.apache.flink.test.runtime.minicluster.LocalFlinkMiniClusterITCase) Time elapsed: 41.681 sec <<< FAILURE! java.lang.AssertionError: Thread Thread[ForkJoinPool.commonPool-worker-25,5,main] was started by the mini cluster, but not shut down at org.junit.Assert.fail(Assert.java:88) at org.apache.flink.test.runtime.minicluster.LocalFlinkMiniClusterITCase.testLocalFlinkMiniClusterWithMultipleTaskManagers(LocalFlinkMiniClusterITCase.java:174) {code} > LocalFlinkMiniClusterITCase#testLocalFlinkMiniClusterWithMultipleTaskManagers > sometimes fails > - > > Key: FLINK-9048 > URL: https://issues.apache.org/jira/browse/FLINK-9048 > Project: Flink > Issue Type: Test >Reporter: Ted Yu >Priority: Minor > > As of commit e0bc37bef69f5376d03214578e9b95816add661b, I got the following : > {code} > testLocalFlinkMiniClusterWithMultipleTaskManagers(org.apache.flink.test.runtime.minicluster.LocalFlinkMiniClusterITCase) > Time elapsed: 41.681 sec <<< FAILURE! > java.lang.AssertionError: Thread > Thread[ForkJoinPool.commonPool-worker-25,5,main] was started by the mini > cluster, but not shut down > at org.junit.Assert.fail(Assert.java:88) > at > org.apache.flink.test.runtime.minicluster.LocalFlinkMiniClusterITCase.testLocalFlinkMiniClusterWithMultipleTaskManagers(LocalFlinkMiniClusterITCase.java:174) > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-9150) Prepare for Java 10
[ https://issues.apache.org/jira/browse/FLINK-9150?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ted Yu updated FLINK-9150: -- Description: Java 9 is not a LTS release. When compiling with Java 10, I see the following compilation error: {code} [ERROR] Failed to execute goal on project flink-shaded-hadoop2: Could not resolve dependencies for project org.apache.flink:flink-shaded-hadoop2:jar:1.6-SNAPSHOT: Could not find artifact jdk.tools:jdk.tools:jar:1.6 at specified path /a/jdk-10/../lib/tools.jar -> [Help 1] {code} was: Java 9 is not a LTS release. When compiling with Java 10, I see the following compilation error: {code} [ERROR] Failed to execute goal on project flink-shaded-hadoop2: Could not resolve dependencies for project org.apache.flink:flink-shaded-hadoop2:jar:1.6-SNAPSHOT: Could not find artifact jdk.tools:jdk.tools:jar:1.6 at specified path /a/jdk-10/../lib/tools.jar -> [Help 1] {code} > Prepare for Java 10 > --- > > Key: FLINK-9150 > URL: https://issues.apache.org/jira/browse/FLINK-9150 > Project: Flink > Issue Type: Task >Reporter: Ted Yu >Priority: Major > > Java 9 is not a LTS release. > When compiling with Java 10, I see the following compilation error: > {code} > [ERROR] Failed to execute goal on project flink-shaded-hadoop2: Could not > resolve dependencies for project > org.apache.flink:flink-shaded-hadoop2:jar:1.6-SNAPSHOT: Could not find > artifact jdk.tools:jdk.tools:jar:1.6 at specified path > /a/jdk-10/../lib/tools.jar -> [Help 1] > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-9281) LogBack not working
Tim created FLINK-9281: -- Summary: LogBack not working Key: FLINK-9281 URL: https://issues.apache.org/jira/browse/FLINK-9281 Project: Flink Issue Type: Bug Components: Logging Affects Versions: 1.4.2 Reporter: Tim I am trying to get Flink to work with Logback instead of Log4J. However, it is not working. My setup is as follows the advice on this page: https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/best_practices.html#use-logback-when-running-flink-on-a-cluster * Flink v1.4.2 running a stand-alone cluster. * Started JobManager as a foreground process (bin/jobmanager.sh start-foreground cluster). I updated bin/flink-console.sh to reference logback.xml via -Dlogback.configurationFile=file:/path/to/logfile. * Removed log4j jars under libs/ (log4j-1.2.xx.jar and sfl4j-log4j12-xxx.jar) * Added logback jars under libs/ (logback-classic, logback-core, log4j-over-slf4j.jar) However, I am not getting any file created. Also, as a dumb test I referenced a non-existent logback.xml file (changed path to a non-existent folder) just to see if any errors appear on stdout, but nothing. Thanks -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9267) Classloading issues when using RemoteEnvironment ...
[ https://issues.apache.org/jira/browse/FLINK-9267?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16460061#comment-16460061 ] Kedar Mhaswade commented on FLINK-9267: --- Wow! Thank you for the tip above. I have been able to talk to the cluster now. There are still serialization errors: {noformat} 2018-05-01 12:30:17,598 ERROR Remoting - scala.Option; local class incompatible: stream classdesc serialVersionUID = -114498752079829388, local class serialVersionUID = -2062608324514658839 java.io.InvalidClassException: scala.Option; local class incompatible: stream classdesc serialVersionUID = -114498752079829388, local class serialVersionUID = -2062608324514658839 at java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:687) at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1876) at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1745) at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1876) at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1745) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2033) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1567) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2278) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2202) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2060) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1567) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:427) at akka.serialization.JavaSerializer$$anonfun$1.apply(Serializer.scala:136) at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) at akka.serialization.JavaSerializer.fromBinary(Serializer.scala:136) at akka.serialization.Serialization$$anonfun$deserialize$1.apply(Serialization.scala:104) at scala.util.Try$.apply(Try.scala:161) at akka.serialization.Serialization.deserialize(Serialization.scala:98) at akka.remote.serialization.MessageContainerSerializer.fromBinary(MessageContainerSerializer.scala:63) at akka.serialization.Serialization$$anonfun$deserialize$1.apply(Serialization.scala:104) at scala.util.Try$.apply(Try.scala:161) at akka.serialization.Serialization.deserialize(Serialization.scala:98) at akka.remote.MessageSerializer$.deserialize(MessageSerializer.scala:23) at akka.remote.DefaultMessageDispatcher.payload$lzycompute$1(Endpoint.scala:58) at akka.remote.DefaultMessageDispatcher.payload$1(Endpoint.scala:58) at akka.remote.DefaultMessageDispatcher.dispatch(Endpoint.scala:76) at akka.remote.EndpointReader$$anonfun$receive$2.applyOrElse(Endpoint.scala:967) at akka.actor.Actor$class.aroundReceive(Actor.scala:467) at akka.remote.EndpointActor.aroundReceive(Endpoint.scala:437) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) at akka.actor.ActorCell.invoke(ActorCell.scala:487) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238) at akka.dispatch.Mailbox.run(Mailbox.scala:220) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) {noformat} but this gives me something to work with. Thanks again. We would soon close this as "not a Flink issue". Will do that. > Classloading issues when using RemoteEnvironment ... > > > Key: FLINK-9267 > URL: https://issues.apache.org/jira/browse/FLINK-9267 > Project: Flink > Issue Type: Bug > Components: Job-Submission >Affects Versions: 1.4.2 > Environment: I have tried with Flink 1.3.3, Flink 1.4.2 and Flink > 1.6.0-SNAPSHOT. > Oracle JDK 1.8.0_161 on Mac with a local cluster containing one JM and one TM. >Reporter: Kedar Mhaswade >Priority: Major > > See these two threads: > * [Nov > 2017|http://mail-archives.apache.org/mod_mbox/flink-user/201711.mbox/%3c66d706ad-5e47-4bd2-a7af-1db41cce7...@gmail.com%3E] > * [April > 2018|http://mail-archives.apache.org/mod_mbox/flink-user/201804.mbox/%3CCABzSAw_sQ149F8%2BS2Mpg%3DCH75F_7PuUx3hQYjdnLmOUL5O23oQ%40mail.gmail.com%3E] > Both these threads show the classloading problems with using the > {{RemoteEnvironment}}. The inst
[jira] [Commented] (FLINK-9267) Classloading issues when using RemoteEnvironment ...
[ https://issues.apache.org/jira/browse/FLINK-9267?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16460019#comment-16460019 ] Chesnay Schepler commented on FLINK-9267: - Please add the following to your {{maven-shade-plugin}} configuration: {code} reference.conf {code} Source: https://stackoverflow.com/questions/31011243/no-configuration-setting-found-for-key-akka-version > Classloading issues when using RemoteEnvironment ... > > > Key: FLINK-9267 > URL: https://issues.apache.org/jira/browse/FLINK-9267 > Project: Flink > Issue Type: Bug > Components: Job-Submission >Affects Versions: 1.4.2 > Environment: I have tried with Flink 1.3.3, Flink 1.4.2 and Flink > 1.6.0-SNAPSHOT. > Oracle JDK 1.8.0_161 on Mac with a local cluster containing one JM and one TM. >Reporter: Kedar Mhaswade >Priority: Major > > See these two threads: > * [Nov > 2017|http://mail-archives.apache.org/mod_mbox/flink-user/201711.mbox/%3c66d706ad-5e47-4bd2-a7af-1db41cce7...@gmail.com%3E] > * [April > 2018|http://mail-archives.apache.org/mod_mbox/flink-user/201804.mbox/%3CCABzSAw_sQ149F8%2BS2Mpg%3DCH75F_7PuUx3hQYjdnLmOUL5O23oQ%40mail.gmail.com%3E] > Both these threads show the classloading problems with using the > {{RemoteEnvironment}}. The instructions to [reproduce are > here|http://mail-archives.apache.org/mod_mbox/flink-user/201804.mbox/%3ccabzsaw_0qw_44xvjdtwz9y+s-pfgg2+cfx2c46gwqqngjj0...@mail.gmail.com%3E]. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9267) Classloading issues when using RemoteEnvironment ...
[ https://issues.apache.org/jira/browse/FLINK-9267?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16460002#comment-16460002 ] Kedar Mhaswade commented on FLINK-9267: --- I also built Flink 1.3.1 from [source|https://archive.apache.org/dist/flink/flink-1.3.1/flink-1.3.1-src.tgz]. I still get the same result on the Gradoop server side! I really want to know what you did to be able to submit the Flink job from Gradoop server. > Classloading issues when using RemoteEnvironment ... > > > Key: FLINK-9267 > URL: https://issues.apache.org/jira/browse/FLINK-9267 > Project: Flink > Issue Type: Bug > Components: Job-Submission >Affects Versions: 1.4.2 > Environment: I have tried with Flink 1.3.3, Flink 1.4.2 and Flink > 1.6.0-SNAPSHOT. > Oracle JDK 1.8.0_161 on Mac with a local cluster containing one JM and one TM. >Reporter: Kedar Mhaswade >Priority: Major > > See these two threads: > * [Nov > 2017|http://mail-archives.apache.org/mod_mbox/flink-user/201711.mbox/%3c66d706ad-5e47-4bd2-a7af-1db41cce7...@gmail.com%3E] > * [April > 2018|http://mail-archives.apache.org/mod_mbox/flink-user/201804.mbox/%3CCABzSAw_sQ149F8%2BS2Mpg%3DCH75F_7PuUx3hQYjdnLmOUL5O23oQ%40mail.gmail.com%3E] > Both these threads show the classloading problems with using the > {{RemoteEnvironment}}. The instructions to [reproduce are > here|http://mail-archives.apache.org/mod_mbox/flink-user/201804.mbox/%3ccabzsaw_0qw_44xvjdtwz9y+s-pfgg2+cfx2c46gwqqngjj0...@mail.gmail.com%3E]. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9267) Classloading issues when using RemoteEnvironment ...
[ https://issues.apache.org/jira/browse/FLINK-9267?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16459924#comment-16459924 ] Kedar Mhaswade commented on FLINK-9267: --- This is very strange! Thanks [~Zentol] for trying this out, but I have the following environment and it does not work for me: # Mac OS High Sierra 10.13.4 # {{java version "1.8.0_161"}} # Install binary version of [Flink 1.3.1|https://archive.apache.org/dist/flink/flink-1.3.1/flink-1.3.1-bin-hadoop2-scala_2.11.tgz]. I believe this is the correct version. Let me know if I should use different. # Clone [https://github.com/kedarmhaswade/gradoop_demo.git] # cd gradoop_demo; git checkout -b dev # mvn clean package; \{{mvn -version}} prints: {noformat} Apache Maven 3.3.9 (bb52d8502b132ec0a5a3f4c09453c07478323dc5; 2015-11-10T08:41:47-08:00) Maven home: /usr/local/Cellar/maven/3.3.9/libexec Java version: 1.8.0_161, vendor: Oracle Corporation Java home: /Users/kedar/.sdkman/candidates/java/8.0.161-oracle/jre Default locale: en_US, platform encoding: UTF-8 OS name: "mac os x", version: "10.13.4", arch: "x86_64", family: "mac"{noformat} # The head commit is: {{042e5f8 (HEAD -> dev, origin/dev) comment out the test; trying to reproduce Chesnay's environment for FLINK-9267}}. # Copy the built {{target/gradoop-demo-0.2.0.jar}} to {{/lib}}. # Start Flink Cluster: {{/bin/start-cluster.sh}}. # Start Gradoop demo server with the Flink cluster in 9: {{java -cp target/classes:target/gradoop-demo-shaded.jar org.gradoop.demo.server.Server --jmhost localhost --jmport 6123}} # Either use the browser to connect to {{http://localhost:2342/gradoop/html/cypher.html}} or do a simple curl query: {{curl -vL -X POST http://localhost:2342/keys/Example | jq}} # The Gradoop demo server throws exception: {noformat} org.apache.flink.client.program.ProgramInvocationException: Could not start the ActorSystem needed to talk to the JobManager. at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:461) at org.apache.flink.client.program.StandaloneClusterClient.submitJob(StandaloneClusterClient.java:105) at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:442) at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:429) at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:404) at org.apache.flink.client.RemoteExecutor.executePlanWithJars(RemoteExecutor.java:211) at org.apache.flink.client.RemoteExecutor.executePlan(RemoteExecutor.java:188) at org.apache.flink.api.java.RemoteEnvironment.execute(RemoteEnvironment.java:172) at org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:926) at org.gradoop.demo.server.RequestHandler.getResponse(RequestHandler.java:470) at org.gradoop.demo.server.RequestHandler.createResponse(RequestHandler.java:453) at org.gradoop.demo.server.RequestHandler.executeCypher(RequestHandler.java:143) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at com.sun.jersey.spi.container.JavaMethodInvokerFactory$1.invoke(JavaMethodInvokerFactory.java:60) at com.sun.jersey.server.impl.model.method.dispatch.AbstractResourceMethodDispatchProvider$ResponseOutInvoker._dispatch(AbstractResourceMethodDispatchProvider.java:205) at com.sun.jersey.server.impl.model.method.dispatch.ResourceJavaMethodDispatcher.dispatch(ResourceJavaMethodDispatcher.java:75) at com.sun.jersey.server.impl.uri.rules.HttpMethodRule.accept(HttpMethodRule.java:302) at com.sun.jersey.server.impl.uri.rules.RightHandPathRule.accept(RightHandPathRule.java:147) at com.sun.jersey.server.impl.uri.rules.ResourceClassRule.accept(ResourceClassRule.java:108) at com.sun.jersey.server.impl.uri.rules.RightHandPathRule.accept(RightHandPathRule.java:147) at com.sun.jersey.server.impl.uri.rules.RootResourceClassesRule.accept(RootResourceClassesRule.java:84) at com.sun.jersey.server.impl.application.WebApplicationImpl._handleRequest(WebApplicationImpl.java:1542) at com.sun.jersey.server.impl.application.WebApplicationImpl._handleRequest(WebApplicationImpl.java:1473) at com.sun.jersey.server.impl.application.WebApplicationImpl.handleRequest(WebApplicationImpl.java:1419) at com.sun.jersey.server.impl.application.WebApplicationImpl.handleRequest(WebApplicationImpl.java:1409) at com.sun.jersey.server.impl.container.grizzly2.GrizzlyContainer._service(GrizzlyContainer.java:222) at com.sun.jersey.server.impl.container.grizzly2.GrizzlyContainer.service(GrizzlyContainer.java:192) at org.glassfish.grizzly.http.server.HttpHandler.doHandle(HttpHandler.java:164) at org.glassfish.grizzly.http.server.HttpHandlerChain.service(HttpHandlerChain.java:196) at org.glassfish.grizzly.ht
[jira] [Commented] (FLINK-7789) Add handler for Async IO operator timeouts
[ https://issues.apache.org/jira/browse/FLINK-7789?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16459804#comment-16459804 ] tarun razdan commented on FLINK-7789: - Hi Team, Can anyone suggest some alternate method to handle the timeout exception till the issue get resolved? Cheers, Tarun Razdan > Add handler for Async IO operator timeouts > --- > > Key: FLINK-7789 > URL: https://issues.apache.org/jira/browse/FLINK-7789 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Reporter: Karthik Deivasigamani >Priority: Major > > Currently Async IO operator does not provide a mechanism to handle timeouts. > When a request times out it an exception is thrown and job is restarted. It > would be good to pass a AsyncIOTimeoutHandler which can be implemented by the > user and passed in the constructor. > Here is the discussion from apache flink users mailing list > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/async-io-operator-timeouts-tt16068.html -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9267) Classloading issues when using RemoteEnvironment ...
[ https://issues.apache.org/jira/browse/FLINK-9267?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16459784#comment-16459784 ] Chesnay Schepler commented on FLINK-9267: - I setup a local 1.3.1 cluster, freshly built from source, no modifications to /lib or flink-conf.yaml. I can successfully run the job after placing {{gradoop-demo-shaded.jar}} into the /lib folder. I can also successfully submit jobs using the {{RemoteEnvironment}} after hard-coding the absolute path to the fat-jar ({{gradoop-demo-shaded.jar}}) in the {{RemoteEnvironment}} constructor call in {{Server#getExecutionEnvironment}}, at commit b7737c43364ebd0eb8724be64f1f53b60c845ffd. As it stands I cannot reproduce the problem. I used maven 3.5 and openjdk 1.8.0_162. > Classloading issues when using RemoteEnvironment ... > > > Key: FLINK-9267 > URL: https://issues.apache.org/jira/browse/FLINK-9267 > Project: Flink > Issue Type: Bug > Components: Job-Submission >Affects Versions: 1.4.2 > Environment: I have tried with Flink 1.3.3, Flink 1.4.2 and Flink > 1.6.0-SNAPSHOT. > Oracle JDK 1.8.0_161 on Mac with a local cluster containing one JM and one TM. >Reporter: Kedar Mhaswade >Priority: Major > > See these two threads: > * [Nov > 2017|http://mail-archives.apache.org/mod_mbox/flink-user/201711.mbox/%3c66d706ad-5e47-4bd2-a7af-1db41cce7...@gmail.com%3E] > * [April > 2018|http://mail-archives.apache.org/mod_mbox/flink-user/201804.mbox/%3CCABzSAw_sQ149F8%2BS2Mpg%3DCH75F_7PuUx3hQYjdnLmOUL5O23oQ%40mail.gmail.com%3E] > Both these threads show the classloading problems with using the > {{RemoteEnvironment}}. The instructions to [reproduce are > here|http://mail-archives.apache.org/mod_mbox/flink-user/201804.mbox/%3ccabzsaw_0qw_44xvjdtwz9y+s-pfgg2+cfx2c46gwqqngjj0...@mail.gmail.com%3E]. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #5928: [hotfix][doc] fix doc of externalized checkpoint
Github user sihuazhou commented on the issue: https://github.com/apache/flink/pull/5928 @StephanEwen @alpinegizmo Thanks for your comments, I will revert the changes of `config.md` and address the comments concerning to the ``` `file://` only for local setups```. ---
[jira] [Commented] (FLINK-9261) Regression - Flink CLI and Web UI not working when SSL is enabled
[ https://issues.apache.org/jira/browse/FLINK-9261?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16459686#comment-16459686 ] Chesnay Schepler commented on FLINK-9261: - Given that the UI and client in part go through the same REST calls I'm not sure whether we can implement things such that the client is authenticated but the UI isn't. What we could maybe do is separate the REST API into a monitoring part (getting details for a job/jm) and control part (submitting jobs, savepoints, cancellation) with separate SSL settings. > Regression - Flink CLI and Web UI not working when SSL is enabled > - > > Key: FLINK-9261 > URL: https://issues.apache.org/jira/browse/FLINK-9261 > Project: Flink > Issue Type: Bug > Components: Client, Network, Web Client >Affects Versions: 1.5.0 >Reporter: Edward Rojas >Priority: Blocker > Labels: regression > Fix For: 1.5.0 > > > When *security.ssl.enabled* config is set to true, Web UI is no longer > reachable; there is no logs on jobmanager. > > When setting *web.ssl.enabled* to false (keeping security.ssl.enabled to > true), the dashboard is not reachable and there is the following exception on > jobmanager: > {code:java} > WARN org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint - > Unhandled exception > org.apache.flink.shaded.netty4.io.netty.handler.ssl.NotSslRecordException: > not an SSL/TLS record: > 474554202f20485454502f312e310d0a486f73743a206c6f63616c686f73743a383038310d0a436f6e6e656374696f6e3a206b6565702d616c6976650d0a557067726164652d496e7365637572652d52657175657374733a20310d0a557365722d4167656e743a204d6f7a696c6c612f352e3020284d6163696e746f73683b20496e74656c204d6163204f5320582031305f31335f3329204170706c655765624b69742f3533372e333620284b48544d4c2c206c696b65204765636b6f29204368726f6d652f36352e302e32352e313831205361666172692f3533372e33360d0a4163636570743a20746578742f68746d6c2c6170706c69636174696f6e2f7868746d6c2b786d6c2c6170706c69636174696f6e2f786d6c3b713d302e392c696d6167652f776562702c696d6167652f61706e672c2a2f2a3b713d302e380d0a4163636570742d456e636f64696e673a20677a69702c206465666c6174652c2062720d0a4163636570742d4c616e67756167653a20656e2c656e2d47423b713d302e392c65732d3431393b713d302e382c65733b713d302e372c66722d46523b713d302e362c66723b713d302e350d0a436f6f6b69653a20496465612d39326365626136363d39396464633637632d613838382d346439332d396166612d3737396631373636326264320d0a49662d4d6f6469666965642d53696e63653a205468752c2032362041707220323031382031313a30313a313520474d540d0a0d0a > at > org.apache.flink.shaded.netty4.io.netty.handler.ssl.SslHandler.decode(SslHandler.java:940) > at > org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:315) > at > org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:229) > at > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339) > at > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324) > at > org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:847) > at > org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131) > at > org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511) > at > org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468) > at > org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382) > at > org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354) > at > org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111) > at > org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:137) > at java.lang.Thread.run(Thread.java:745) > {code} > Also when trying to use the Flink CLI, it get stuck on "Waiting for > response..." and there is no error messages on jobmanager. None of the > commands works, list, run etc. > > Taskmanagers are able to registrate to Jobmanager, so the SSL configuration > is good. > > SSL configuration: > security.ssl.enabled: true > security.ssl.keystore: /path/to/keystore > security.ssl.keystore-password: > security.ssl.key-password: > security.ssl.truststore: /path/to/truststore > security.ssl.truststore-password: > web.ssl.enabled: false > This same con
[jira] [Commented] (FLINK-9196) YARN: Flink binaries are not deleted from HDFS after cluster shutdown
[ https://issues.apache.org/jira/browse/FLINK-9196?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16459665#comment-16459665 ] yuqi commented on FLINK-9196: - [~gjy] I see, thanks for your suggestion. > YARN: Flink binaries are not deleted from HDFS after cluster shutdown > - > > Key: FLINK-9196 > URL: https://issues.apache.org/jira/browse/FLINK-9196 > Project: Flink > Issue Type: Bug > Components: YARN >Affects Versions: 1.5.0 >Reporter: Gary Yao >Assignee: Gary Yao >Priority: Blocker > Labels: flip-6 > Fix For: 1.5.0 > > Attachments: 0001-xxx.patch > > > When deploying on YARN in flip6 mode, the Flink binaries are not deleted from > HDFS after the cluster shuts down. > *Steps to reproduce* > # Submit job in YARN job mode, non-detached: > {noformat} HADOOP_CLASSPATH=`hadoop classpath` bin/flink run -m yarn-cluster > -yjm 2048 -ytm 2048 ./examples/streaming/WordCount.jar {noformat} > # Check contents of {{/user/hadoop/.flink/}} on HDFS after > job is finished: > {noformat} > [hadoop@ip-172-31-43-78 flink-1.5.0]$ hdfs dfs -ls > /user/hadoop/.flink/application_1523966184826_0016 > Found 6 items > -rw-r--r-- 1 hadoop hadoop583 2018-04-17 14:54 > /user/hadoop/.flink/application_1523966184826_0016/90cf5b3a-039e-4d52-8266-4e9563d74827-taskmanager-conf.yaml > -rw-r--r-- 1 hadoop hadoop332 2018-04-17 14:54 > /user/hadoop/.flink/application_1523966184826_0016/application_1523966184826_0016-flink-conf.yaml3818971235442577934.tmp > -rw-r--r-- 1 hadoop hadoop 89779342 2018-04-02 17:08 > /user/hadoop/.flink/application_1523966184826_0016/flink-dist_2.11-1.5.0.jar > drwxrwxrwx - hadoop hadoop 0 2018-04-17 14:54 > /user/hadoop/.flink/application_1523966184826_0016/lib > -rw-r--r-- 1 hadoop hadoop 1939 2018-04-02 15:37 > /user/hadoop/.flink/application_1523966184826_0016/log4j.properties > -rw-r--r-- 1 hadoop hadoop 2331 2018-04-02 15:37 > /user/hadoop/.flink/application_1523966184826_0016/logback.xml > {noformat} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-6420) Cleaner CEP API to specify conditions between events
[ https://issues.apache.org/jira/browse/FLINK-6420?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16459591#comment-16459591 ] Tarush Grover commented on FLINK-6420: -- Can I take this up? > Cleaner CEP API to specify conditions between events > > > Key: FLINK-6420 > URL: https://issues.apache.org/jira/browse/FLINK-6420 > Project: Flink > Issue Type: Improvement > Components: CEP >Affects Versions: 1.3.0 >Reporter: Elias Levy >Priority: Minor > > Flink 1.3 will introduce so-called iterative conditions, which allow the > predicate to look up events already matched by conditions in the pattern. > This permits specifying conditions between matched events, similar to a > conditional join between tables in SQL. Alas, the API could be simplified to > specify such conditions more declaratively. > At the moment you have to do something like > {code} > Pattern. > .begin[Foo]("first") > .where( first => first.baz == 1 ) > .followedBy("next") > .where({ (next, ctx) => > val first = ctx.getEventsForPattern("first").next > first.bar == next.bar && next => next.boo = "x" > }) > {code} > which is not very clean. It would friendlier if you could do something like: > {code} > Pattern. > .begin[Foo]("first") > .where( first => first.baz == 1 ) > .followedBy("next") > .relatedTo("first", { (first, next) => first.bar == next.bar }) > .where( next => next.boo = "x" ) > {code} > Something along these lines would work well when the condition being tested > against matches a single event (single quantifier). > If the condition being tested can accept multiple events (e.g. times > quantifier) two other methods could be used {{relatedToAny}} and > {{relatedToAll}}, each of which takes a predicate function. In both cases > each previously accepted element of the requested condition is evaluated > against the predicate. In the former case if any evaluation returns true the > condition is satisfied. In the later case all evaluations must return true > for the condition to be satisfied. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-9280) Extend JobSubmitHandler to accept jar files
Chesnay Schepler created FLINK-9280: --- Summary: Extend JobSubmitHandler to accept jar files Key: FLINK-9280 URL: https://issues.apache.org/jira/browse/FLINK-9280 Project: Flink Issue Type: New Feature Components: Job-Submission, REST Affects Versions: 1.5.0 Reporter: Chesnay Schepler The job submission through the CLI first uploads all require jars to the blob server, sets the blob keys in the jobgraph, and then uploads this graph to The {{JobSubmitHandler}} which submits it to the Dispatcher. This process has the downside that it requires jars to be uploaded to the blobserver before submitting the job graph, which does not happen via REST. I propose an extension to the the {{JobSubmitHandler}} to also accept an optional list of jar files, that were previously uploaded through the {{JarUploadHandler}}. If present, the handler would upload these jars to the blobserver and set the blob keys. -- This message was sent by Atlassian JIRA (v7.6.3#76005)