[jira] [Assigned] (BEAM-2995) can't read/write hdfs in Flink CLUSTER(Standalone)
[ https://issues.apache.org/jira/browse/BEAM-2995?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dawid Wysakowicz reassigned BEAM-2995: -- Assignee: Dawid Wysakowicz (was: Aljoscha Krettek) > can't read/write hdfs in Flink CLUSTER(Standalone) > -- > > Key: BEAM-2995 > URL: https://issues.apache.org/jira/browse/BEAM-2995 > Project: Beam > Issue Type: Bug > Components: runner-flink >Affects Versions: 2.2.0 >Reporter: huangjianhuang >Assignee: Dawid Wysakowicz >Priority: Major > > i just write a simple demo like: > {code:java} > Configuration conf = new Configuration(); > conf.set("fs.default.name", "hdfs://localhost:9000"); > //other codes > p.apply("ReadLines", > TextIO.read().from("hdfs://localhost:9000/tmp/words")) > > .apply(TextIO.write().to("hdfs://localhost:9000/tmp/hdfsout")); > {code} > it works in flink local model with cmd: > {code:java} > mvn exec:java -Dexec.mainClass=com.joe.FlinkWithHDFS -Pflink-runner > -Dexec.args="--runner=FlinkRunner > --filesToStage=target/flinkBeam-2.2.0-SNAPSHOT-shaded.jar" > {code} > but not works in CLUSTER mode: > {code:java} > mvn exec:java -Dexec.mainClass=com.joe.FlinkWithHDFS -Pflink-runner > -Dexec.args="--runner=FlinkRunner > --filesToStage=target/flinkBeam-2.2.0-SNAPSHOT-shaded.jar > --flinkMaster=localhost:6123 " > {code} > it seems the flink cluster regard the hdfs as local file system. > The input log from flink-jobmanger.log is: > {code:java} > 2017-09-27 20:17:37,962 INFO org.apache.flink.runtime.jobmanager.JobManager > - Successfully ran initialization on master in 136 ms. > 2017-09-27 20:17:37,968 INFO org.apache.beam.sdk.io.FileBasedSource > - {color:red}Filepattern hdfs://localhost:9000/tmp/words2 > matched 0 files with total size 0{color} > 2017-09-27 20:17:37,968 INFO org.apache.beam.sdk.io.FileBasedSource > - Splitting filepattern hdfs://localhost:9000/tmp/words2 into > bundles of size 0 took 0 ms and produced 0 files a > nd 0 bundles > {code} > The output error message is : > {code:java} > Caused by: java.lang.ClassCastException: > {color:red}org.apache.beam.sdk.io.hdfs.HadoopResourceId cannot be cast to > org.apache.beam.sdk.io.LocalResourceId{color} > at > org.apache.beam.sdk.io.LocalFileSystem.create(LocalFileSystem.java:77) > at org.apache.beam.sdk.io.FileSystems.create(FileSystems.java:256) > at org.apache.beam.sdk.io.FileSystems.create(FileSystems.java:243) > at > org.apache.beam.sdk.io.FileBasedSink$Writer.open(FileBasedSink.java:922) > at > org.apache.beam.sdk.io.FileBasedSink$Writer.openUnwindowed(FileBasedSink.java:884) > at > org.apache.beam.sdk.io.WriteFiles.finalizeForDestinationFillEmptyShards(WriteFiles.java:909) > at org.apache.beam.sdk.io.WriteFiles.access$900(WriteFiles.java:110) > at > org.apache.beam.sdk.io.WriteFiles$2.processElement(WriteFiles.java:858) > {code} > can somebody help me, i've try all the way just can't work it out [cry] > https://issues.apache.org/jira/browse/BEAM-2457 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (BEAM-2995) can't read/write hdfs in Flink CLUSTER(Standalone)
[ https://issues.apache.org/jira/browse/BEAM-2995?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16352114#comment-16352114 ] Dawid Wysakowicz commented on BEAM-2995: [~huangjianhuang] I could not reproduce your exact problem, but it does not connect to hdfs in my case if I do not add proper transformer, as explained in BEAM-2457. {code:java} org.apache.maven.plugins maven-shade-plugin false *:* META-INF/*.SF META-INF/*.DSA META-INF/*.RSA package shade true shaded {code} > can't read/write hdfs in Flink CLUSTER(Standalone) > -- > > Key: BEAM-2995 > URL: https://issues.apache.org/jira/browse/BEAM-2995 > Project: Beam > Issue Type: Bug > Components: runner-flink >Affects Versions: 2.2.0 >Reporter: huangjianhuang >Assignee: Dawid Wysakowicz >Priority: Major > > i just write a simple demo like: > {code:java} > Configuration conf = new Configuration(); > conf.set("fs.default.name", "hdfs://localhost:9000"); > //other codes > p.apply("ReadLines", > TextIO.read().from("hdfs://localhost:9000/tmp/words")) > > .apply(TextIO.write().to("hdfs://localhost:9000/tmp/hdfsout")); > {code} > it works in flink local model with cmd: > {code:java} > mvn exec:java -Dexec.mainClass=com.joe.FlinkWithHDFS -Pflink-runner > -Dexec.args="--runner=FlinkRunner > --filesToStage=target/flinkBeam-2.2.0-SNAPSHOT-shaded.jar" > {code} > but not works in CLUSTER mode: > {code:java} > mvn exec:java -Dexec.mainClass=com.joe.FlinkWithHDFS -Pflink-runner > -Dexec.args="--runner=FlinkRunner > --filesToStage=target/flinkBeam-2.2.0-SNAPSHOT-shaded.jar > --flinkMaster=localhost:6123 " > {code} > it seems the flink cluster regard the hdfs as local file system. > The input log from flink-jobmanger.log is: > {code:java} > 2017-09-27 20:17:37,962 INFO org.apache.flink.runtime.jobmanager.JobManager > - Successfully ran initialization on master in 136 ms. > 2017-09-27 20:17:37,968 INFO org.apache.beam.sdk.io.FileBasedSource > - {color:red}Filepattern hdfs://localhost:9000/tmp/words2 > matched 0 files with total size 0{color} > 2017-09-27 20:17:37,968 INFO org.apache.beam.sdk.io.FileBasedSource > - Splitting filepattern hdfs://localhost:9000/tmp/words2 into > bundles of size 0 took 0 ms and produced 0 files a > nd 0 bundles > {code} > The output error message is : > {code:java} > Caused by: java.lang.ClassCastException: > {color:red}org.apache.beam.sdk.io.hdfs.HadoopResourceId cannot be cast to > org.apache.beam.sdk.io.LocalResourceId{color} > at > org.apache.beam.sdk.io.LocalFileSystem.create(LocalFileSystem.java:77) > at org.apache.beam.sdk.io.FileSystems.create(FileSystems.java:256) > at org.apache.beam.sdk.io.FileSystems.create(FileSystems.java:243) > at > org.apache.beam.sdk.io.FileBasedSink$Writer.open(FileBasedSink.java:922) > at > org.apache.beam.sdk.io.FileBasedSink$Writer.openUnwindowed(FileBasedSink.java:884) > at > org.apache.beam.sdk.io.WriteFiles.finalizeForDestinationFillEmptyShards(WriteFiles.java:909) > at org.apache.beam.sdk.io.WriteFiles.access$900(WriteFiles.java:110) > at > org.apache.beam.sdk.io.WriteFiles$2.processElement(WriteFiles.java:858) > {code} > can somebody help me, i've try all the way just can't work it out [cry] > https://issues.apache.org/jira/browse/BEAM-2457 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (BEAM-1107) Display user names for steps in the Flink Web UI
[ https://issues.apache.org/jira/browse/BEAM-1107?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16400471#comment-16400471 ] Dawid Wysakowicz commented on BEAM-1107: Fixed in [BEAM-3043] > Display user names for steps in the Flink Web UI > > > Key: BEAM-1107 > URL: https://issues.apache.org/jira/browse/BEAM-1107 > Project: Beam > Issue Type: Improvement > Components: runner-flink >Reporter: Daniel Halperin >Assignee: Aljoscha Krettek >Priority: Major > Attachments: screenshot-1.png > > > [copying in-person / email discussion at Strata Singapore to JIRA] > The FlinkBatchTransformTranslators use transform.getName() [1] -- this is the > "SDK name" for the transform. > The "user name" for the transform is not available here, it is in fact on the > TransformHierarchy.Node as node.getFullName() [2]. > getFullName() is used some in Flink, but not when setting step names. > I drafted a quick commit that sort of propagates the user names to the web UI > (but only for DataSource, and still too verbose: > https://github.com/dhalperi/incubator-beam/commit/a2f1fb06b22a85ec738e4f2a604c9a129891916c) > Before this change, the "ReadLines" step showed up as: "DataSource (at > Read(CompressedSource) > (org.apache.beam.runners.flink.translation.wrappers.SourceInputFormat))" > With this change, it shows up as "DataSource (at ReadLines/Read > (org.apache.beam.runners.flink.translation.wrappers.SourceInputFormat))" > which I think is closer. [I'd still like it to JUST be "ReadLines/Read" e.g.]. > Thoughts? > [1] > https://github.com/apache/incubator-beam/blob/master/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchTransformTranslators.java#L129 > [2] > https://github.com/apache/incubator-beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/TransformHierarchy.java#L252 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (BEAM-1107) Display user names for steps in the Flink Web UI
[ https://issues.apache.org/jira/browse/BEAM-1107?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dawid Wysakowicz resolved BEAM-1107. Resolution: Duplicate Fix Version/s: Not applicable > Display user names for steps in the Flink Web UI > > > Key: BEAM-1107 > URL: https://issues.apache.org/jira/browse/BEAM-1107 > Project: Beam > Issue Type: Improvement > Components: runner-flink >Reporter: Daniel Halperin >Assignee: Aljoscha Krettek >Priority: Major > Fix For: Not applicable > > Attachments: screenshot-1.png > > > [copying in-person / email discussion at Strata Singapore to JIRA] > The FlinkBatchTransformTranslators use transform.getName() [1] -- this is the > "SDK name" for the transform. > The "user name" for the transform is not available here, it is in fact on the > TransformHierarchy.Node as node.getFullName() [2]. > getFullName() is used some in Flink, but not when setting step names. > I drafted a quick commit that sort of propagates the user names to the web UI > (but only for DataSource, and still too verbose: > https://github.com/dhalperi/incubator-beam/commit/a2f1fb06b22a85ec738e4f2a604c9a129891916c) > Before this change, the "ReadLines" step showed up as: "DataSource (at > Read(CompressedSource) > (org.apache.beam.runners.flink.translation.wrappers.SourceInputFormat))" > With this change, it shows up as "DataSource (at ReadLines/Read > (org.apache.beam.runners.flink.translation.wrappers.SourceInputFormat))" > which I think is closer. [I'd still like it to JUST be "ReadLines/Read" e.g.]. > Thoughts? > [1] > https://github.com/apache/incubator-beam/blob/master/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchTransformTranslators.java#L129 > [2] > https://github.com/apache/incubator-beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/TransformHierarchy.java#L252 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (BEAM-3281) PTransform name not being propagated to the Flink Web UI
[ https://issues.apache.org/jira/browse/BEAM-3281?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dawid Wysakowicz resolved BEAM-3281. Resolution: Duplicate Fix Version/s: 2.5.0 > PTransform name not being propagated to the Flink Web UI > > > Key: BEAM-3281 > URL: https://issues.apache.org/jira/browse/BEAM-3281 > Project: Beam > Issue Type: Bug > Components: runner-flink >Affects Versions: 2.1.0 >Reporter: Thalita Vergilio >Assignee: Aljoscha Krettek >Priority: Minor > Labels: flink > Fix For: 2.5.0 > > Attachments: flink-dashboard.PNG > > > This could be related to BEAM-1107, which was logged for Flink Batch > processing. > I am experiencing a similar issue for stream processing. I would have > expected the name passed to > {code:java} > pipeline.apply(String name, PTransform root) > {code} > to be propagated to the Flink Web UI. > The documentation seems to suggest that this was the intended functionality: > https://beam.apache.org/documentation/sdks/javadoc/2.0.0/org/apache/beam/sdk/Pipeline.html#apply-java.lang.String-org.apache.beam.sdk.transforms.PTransform- > Here is some sample code setting the name: > {code:java} > p.apply("Apply Windowing Function", > Window.into(FixedWindows.of(Duration.standardSeconds(10 > .apply("Transform the Pipeline to Key by Window", > ParDo.of( > new DoFn, > KV >>() { > @ProcessElement > public void processElement(ProcessContext > context, IntervalWindow window) { > context.output(KV.of(window, > context.element())); > } > })) > .apply("Group by Key (window)", GroupByKey.create()) > .apply("Calculate PUE", ParDo.of(new PueCalculatorFn())) > .apply("Write output to Kafka", > KafkaIO. write() > .withBootstrapServers(KAFKA_IP + ":" + KAFKA_PORT) > .withTopic("results") > > .withKeySerializer(IntervalWindowResultSerialiser.class) > .withValueSerializer(PueResultSerialiser.class) > ); > {code} > I will upload a screenshot of the results. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (BEAM-3281) PTransform name not being propagated to the Flink Web UI
[ https://issues.apache.org/jira/browse/BEAM-3281?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16400476#comment-16400476 ] Dawid Wysakowicz commented on BEAM-3281: Fixed in [BEAM-3043] > PTransform name not being propagated to the Flink Web UI > > > Key: BEAM-3281 > URL: https://issues.apache.org/jira/browse/BEAM-3281 > Project: Beam > Issue Type: Bug > Components: runner-flink >Affects Versions: 2.1.0 >Reporter: Thalita Vergilio >Assignee: Aljoscha Krettek >Priority: Minor > Labels: flink > Fix For: 2.5.0 > > Attachments: flink-dashboard.PNG > > > This could be related to BEAM-1107, which was logged for Flink Batch > processing. > I am experiencing a similar issue for stream processing. I would have > expected the name passed to > {code:java} > pipeline.apply(String name, PTransform root) > {code} > to be propagated to the Flink Web UI. > The documentation seems to suggest that this was the intended functionality: > https://beam.apache.org/documentation/sdks/javadoc/2.0.0/org/apache/beam/sdk/Pipeline.html#apply-java.lang.String-org.apache.beam.sdk.transforms.PTransform- > Here is some sample code setting the name: > {code:java} > p.apply("Apply Windowing Function", > Window.into(FixedWindows.of(Duration.standardSeconds(10 > .apply("Transform the Pipeline to Key by Window", > ParDo.of( > new DoFn, > KV >>() { > @ProcessElement > public void processElement(ProcessContext > context, IntervalWindow window) { > context.output(KV.of(window, > context.element())); > } > })) > .apply("Group by Key (window)", GroupByKey.create()) > .apply("Calculate PUE", ParDo.of(new PueCalculatorFn())) > .apply("Write output to Kafka", > KafkaIO. write() > .withBootstrapServers(KAFKA_IP + ":" + KAFKA_PORT) > .withTopic("results") > > .withKeySerializer(IntervalWindowResultSerialiser.class) > .withValueSerializer(PueResultSerialiser.class) > ); > {code} > I will upload a screenshot of the results. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (BEAM-1107) Display user names for steps in the Flink Web UI
[ https://issues.apache.org/jira/browse/BEAM-1107?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dawid Wysakowicz updated BEAM-1107: --- Fix Version/s: (was: Not applicable) 2.5.0 > Display user names for steps in the Flink Web UI > > > Key: BEAM-1107 > URL: https://issues.apache.org/jira/browse/BEAM-1107 > Project: Beam > Issue Type: Improvement > Components: runner-flink >Reporter: Daniel Halperin >Assignee: Aljoscha Krettek >Priority: Major > Fix For: 2.5.0 > > Attachments: screenshot-1.png > > > [copying in-person / email discussion at Strata Singapore to JIRA] > The FlinkBatchTransformTranslators use transform.getName() [1] -- this is the > "SDK name" for the transform. > The "user name" for the transform is not available here, it is in fact on the > TransformHierarchy.Node as node.getFullName() [2]. > getFullName() is used some in Flink, but not when setting step names. > I drafted a quick commit that sort of propagates the user names to the web UI > (but only for DataSource, and still too verbose: > https://github.com/dhalperi/incubator-beam/commit/a2f1fb06b22a85ec738e4f2a604c9a129891916c) > Before this change, the "ReadLines" step showed up as: "DataSource (at > Read(CompressedSource) > (org.apache.beam.runners.flink.translation.wrappers.SourceInputFormat))" > With this change, it shows up as "DataSource (at ReadLines/Read > (org.apache.beam.runners.flink.translation.wrappers.SourceInputFormat))" > which I think is closer. [I'd still like it to JUST be "ReadLines/Read" e.g.]. > Thoughts? > [1] > https://github.com/apache/incubator-beam/blob/master/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchTransformTranslators.java#L129 > [2] > https://github.com/apache/incubator-beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/TransformHierarchy.java#L252 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (BEAM-3703) java.io.IOException: KafkaWriter : failed to send 1 records (since last report)
[ https://issues.apache.org/jira/browse/BEAM-3703?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dawid Wysakowicz updated BEAM-3703: --- Description: I am trying to read from file and write to Kafka in google cloud kafka and getting following error: {code} org.apache.beam.sdk.util.UserCodeException: java.io.IOException: KafkaWriter : failed to send 1 records (since last report) at org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:36) at org.apache.beam.sdk.io.kafka.KafkaIO$KafkaWriter$DoFnInvoker.invokeProcessElement(Unknown Source) at org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:177) at org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:141) at org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.processElement(DoFnRunnerWithMetricsUpdate.java:65) at org.apache.beam.runners.flink.translation.functions.FlinkDoFnFunction.mapPartition(FlinkDoFnFunction.java:120) at org.apache.flink.runtime.operators.MapPartitionDriver.run(MapPartitionDriver.java:103) at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:490) at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:355) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702) at java.lang.Thread.run(Thread.java:748) Caused by: java.io.IOException: KafkaWriter : failed to send 1 records (since last report) at org.apache.beam.sdk.io.kafka.KafkaIO$KafkaWriter.checkForFailures(KafkaIO.java:1639) at org.apache.beam.sdk.io.kafka.KafkaIO$KafkaWriter.processElement(KafkaIO.java:1581) Caused by: org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 6 ms. {code} {code} .apply(KafkaIO._write_() .withBootstrapServers("ip1:9092,ip2:9092") .withTopic("feed") .withValueSerializer(StringSerializer.class) .withKeySerializer(StringSerializer.class) //.updateProducerProperties(ImmutableMap.of("security.protocol","PLAINTEXT")) //.updateProducerProperties(ImmutableMap.of("sasl.mechanism","PLAIN")) .values() // writes values to Kafka with default key {code} Kafka is running on google cloud bitnami and I am using Flink runner How do I pass security information to Kafka IO? was: I am trying to read from file and write to Kafka in google cloud kafka and getting following error: org.apache.beam.sdk.util.UserCodeException: java.io.IOException: KafkaWriter : failed to send 1 records (since last report) at org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:36) at org.apache.beam.sdk.io.kafka.KafkaIO$KafkaWriter$DoFnInvoker.invokeProcessElement(Unknown Source) at org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:177) at org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:141) at org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.processElement(DoFnRunnerWithMetricsUpdate.java:65) at org.apache.beam.runners.flink.translation.functions.FlinkDoFnFunction.mapPartition(FlinkDoFnFunction.java:120) at org.apache.flink.runtime.operators.MapPartitionDriver.run(MapPartitionDriver.java:103) at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:490) at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:355) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702) at java.lang.Thread.run(Thread.java:748) Caused by: java.io.IOException: KafkaWriter : failed to send 1 records (since last report) at org.apache.beam.sdk.io.kafka.KafkaIO$KafkaWriter.checkForFailures(KafkaIO.java:1639) at org.apache.beam.sdk.io.kafka.KafkaIO$KafkaWriter.processElement(KafkaIO.java:1581) Caused by: org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 6 ms. .apply(KafkaIO. _write_() .withBootstrapServers("ip1:9092,ip2:9092") .withTopic("feed") .withValueSerializer(StringSerializer.class) .withKeySerializer(StringSerializer.class) //.updateProducerProperties(ImmutableMap.of("security.protocol","PLAINTEXT")) //.updateProducerProperties(ImmutableMap.of("sasl.mechanism","PLAIN")) .values() // writes values to Kafka with default key Kafka is running on google cloud bitnami and I am using Flink runner How do I pass security information to Kafka IO? > java.io.IOException: KafkaWriter : failed to send 1 records (since last > report) > --- > > Key: BEAM-3703 > URL: https://issues.apache.org/jira/browse/BEAM-3703 > Project: Beam > Issue Type: Bug > Components: io-java-kafka, runner-flink >Affects Versions: 2.2.0 >Reporter: jagdeep sihota >Assignee: Raghu Angadi >
[jira] [Assigned] (BEAM-3414) AfterProcessingTime trigger issue with Flink Runner
[ https://issues.apache.org/jira/browse/BEAM-3414?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dawid Wysakowicz reassigned BEAM-3414: -- Assignee: Dawid Wysakowicz (was: Aljoscha Krettek) > AfterProcessingTime trigger issue with Flink Runner > --- > > Key: BEAM-3414 > URL: https://issues.apache.org/jira/browse/BEAM-3414 > Project: Beam > Issue Type: Bug > Components: runner-core, runner-flink >Affects Versions: 2.2.0 > Environment: idea, ubuntu 16.04, FlinkRunner >Reporter: huangjianhuang >Assignee: Dawid Wysakowicz >Priority: Major > > in my demo, I read data from kafka and count globally, finally output the > total count of recieved data, as follow: > {code:java} > FlinkPipelineOptions options = > PipelineOptionsFactory.fromArgs(args).withValidation() > .as(FlinkPipelineOptions.class); > options.setStreaming(true); > options.setRunner(FlinkRunner.class); > Pipeline pipeline = Pipeline.create(options); > pipeline > .apply("Read from kafka", > KafkaIO.read() > //.withTimestampFn(kafkaData -> > TimeUtil.timeMillisToInstant(kafkaData.getKey())) > .withBootstrapServers("localhost:9092") > .withTopic("recharge") > .withKeyDeserializer(StringDeserializer.class) > > .withValueDeserializer(StringDeserializer.class) > .withoutMetadata() > ) > .apply(Values.create()) > .apply(Window.into(new GlobalWindows()) > .triggering(Repeatedly.forever( > > AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardSeconds(5 > .accumulatingFiredPanes() > ) > .apply(Count.globally()) > .apply("output", > ParDo.of(new DoFn () { > @ProcessElement > public void process(ProcessContext context) { > System.out.println("---get at: " + > Instant.now() + "--"); > System.out.println(context.element()); > } > })); > {code} > the result should be displayed after (5s) I sent first data, but sometimes > there were nothing display after I sent data. the pic shows the outputs i got > in a test: > (cant upload a pic, desc as text) > {code:java} > Send 681Msg at: 2018-01-05T06:34:31.436 > ---get at: 2018-01-05T06:34:36.668Z-- > 681 > Send 681Msg at: 2018-01-05T06:34:47.166 > ---get at: 2018-01-05T06:34:52.284Z-- > 1362 > Send 681Msg at: 2018-01-05T06:34:55.505 > Send 681Msg at: 2018-01-05T06:35:22.068 > ---get at: 2018-01-05T06:35:22.112Z-- > 2044 > {code} > btw, the code works fine with direct runner. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (BEAM-3414) AfterProcessingTime trigger issue with Flink Runner
[ https://issues.apache.org/jira/browse/BEAM-3414?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16406119#comment-16406119 ] Dawid Wysakowicz commented on BEAM-3414: I think it might be the same problem as noticed in BEAM-3863 > AfterProcessingTime trigger issue with Flink Runner > --- > > Key: BEAM-3414 > URL: https://issues.apache.org/jira/browse/BEAM-3414 > Project: Beam > Issue Type: Bug > Components: runner-core, runner-flink >Affects Versions: 2.2.0 > Environment: idea, ubuntu 16.04, FlinkRunner >Reporter: huangjianhuang >Assignee: Dawid Wysakowicz >Priority: Major > > in my demo, I read data from kafka and count globally, finally output the > total count of recieved data, as follow: > {code:java} > FlinkPipelineOptions options = > PipelineOptionsFactory.fromArgs(args).withValidation() > .as(FlinkPipelineOptions.class); > options.setStreaming(true); > options.setRunner(FlinkRunner.class); > Pipeline pipeline = Pipeline.create(options); > pipeline > .apply("Read from kafka", > KafkaIO.read() > //.withTimestampFn(kafkaData -> > TimeUtil.timeMillisToInstant(kafkaData.getKey())) > .withBootstrapServers("localhost:9092") > .withTopic("recharge") > .withKeyDeserializer(StringDeserializer.class) > > .withValueDeserializer(StringDeserializer.class) > .withoutMetadata() > ) > .apply(Values.create()) > .apply(Window.into(new GlobalWindows()) > .triggering(Repeatedly.forever( > > AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardSeconds(5 > .accumulatingFiredPanes() > ) > .apply(Count.globally()) > .apply("output", > ParDo.of(new DoFn () { > @ProcessElement > public void process(ProcessContext context) { > System.out.println("---get at: " + > Instant.now() + "--"); > System.out.println(context.element()); > } > })); > {code} > the result should be displayed after (5s) I sent first data, but sometimes > there were nothing display after I sent data. the pic shows the outputs i got > in a test: > (cant upload a pic, desc as text) > {code:java} > Send 681Msg at: 2018-01-05T06:34:31.436 > ---get at: 2018-01-05T06:34:36.668Z-- > 681 > Send 681Msg at: 2018-01-05T06:34:47.166 > ---get at: 2018-01-05T06:34:52.284Z-- > 1362 > Send 681Msg at: 2018-01-05T06:34:55.505 > Send 681Msg at: 2018-01-05T06:35:22.068 > ---get at: 2018-01-05T06:35:22.112Z-- > 2044 > {code} > btw, the code works fine with direct runner. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (BEAM-3225) Non deterministic behaviour of AfterProcessingTime trigger with multiple group by transformations
[ https://issues.apache.org/jira/browse/BEAM-3225?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16401800#comment-16401800 ] Dawid Wysakowicz commented on BEAM-3225: Hi, I've tried to reproduce the _"allowed lateness configuration dictates that only non empty panes should be trigger!!!"_ but couldn't do it by any means. Also could not find any bug or issue in flink runner that could be reason for such behaviour. [~pawelbartoszek] are you able to provide some reliable way to reproduce the problem? > Non deterministic behaviour of AfterProcessingTime trigger with multiple > group by transformations > - > > Key: BEAM-3225 > URL: https://issues.apache.org/jira/browse/BEAM-3225 > Project: Beam > Issue Type: Bug > Components: runner-core, runner-flink >Affects Versions: 2.0.0, 2.1.0 >Reporter: Pawel Bartoszek >Assignee: Aljoscha Krettek >Priority: Critical > > *Context* > I run my [test > code|https://gist.github.com/pbartoszek/fe6b1d7501333a2a4b385a1424f6bd80] > against different triggers and runners. My original problem was that when > writing to a file sink files weren't always produced in a deterministic way. > Please refer to this > [BEAM-3151|https://issues.apache.org/jira/browse/BEAM-3151] When I started > looking at WriteFiles class I noticed that file sink implementation includes > some multiple GroupByKey transformations. Knowing that I wrote my test code > that is using multiple GroupByKey transformations to conclude that this is a > bit buggy(?) support of After(Synchronised)ProcessingTime triggers by > GroupByKey that also influence the file sink. When I run my job using > Dataflow runner I was getting expected output. > *About test code* > The job is counting how many A and B elements it received within 30 sec > windows using Count.perElement. Then I am using GroupByKey to fire every time > count has increased. > Below I outlined the expected standard output: > {code:java} > Let's assume all events are received in the same 30 seconds window. > A -> After count KV{A, 1} -> Final group by KV{A, [1]} > A -> After count KV{A, 2} -> Final group by KV{A, [1,2]} > A -> After count KV{A, 3} -> Final group by KV{A, [1,2,3]} > B -> After count KV{B, 1} -> Final group by KV{B, [1]} > With my trigger configuration I would expect that for every new element > 'After count' is printed with new value followed by 'Final group by' with new > counter. Final group by represents the history of counters then.{code} > > *Problem* > 'Final group by' trigger doesn't always go off although trigger set up would > suggest that. This behaviour is different for different runners and Beam > versions. > > *My observations when using Pubsub* > Trigger configuration > {code:java} > Window.into(FixedWindows.of(standardSeconds(30))) > > .triggering(Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane())) > > .withAllowedLateness(standardSeconds(5), > Window.ClosingBehavior.FIRE_IF_NON_EMPTY) > .accumulatingFiredPanes()) > {code} > > Beam 2.0 Flink Runner > {code:java} > 2017-11-16T14:51:44.294Z After count KV{A, 1} > [2017-11-16T14:51:30.000Z..2017-11-16T14:52:00.000Z) > 2017-11-16T14:51:53.036Z Received Element A > [2017-11-16T14:51:30.000Z..2017-11-16T14:52:00.000Z) > 2017-11-16T14:51:53.143Z After count KV{A, 2} > [2017-11-16T14:51:30.000Z..2017-11-16T14:52:00.000Z) > 2017-11-16T14:51:53.246Z Final group by KV{A, [1, 2]} > [2017-11-16T14:51:30.000Z..2017-11-16T14:52:00.000Z) > 2017-11-16T14:52:03.522Z Received Element A > [2017-11-16T14:52:00.000Z..2017-11-16T14:52:30.000Z) > 2017-11-16T14:52:03.629Z After count KV{A, 1} > [2017-11-16T14:52:00.000Z..2017-11-16T14:52:30.000Z) > 2017-11-16T14:52:03.732Z Final group by KV{A, [1]} > [2017-11-16T14:52:00.000Z..2017-11-16T14:52:30.000Z) > 2017-11-16T14:52:07.270Z Received Element A > [2017-11-16T14:52:00.000Z..2017-11-16T14:52:30.000Z) > 2017-11-16T14:52:07.372Z After count KV{A, 2} > [2017-11-16T14:52:00.000Z..2017-11-16T14:52:30.000Z) > 2017-11-16T14:52:07.476Z Final group by KV{A, [1, 2]} > [2017-11-16T14:52:00.000Z..2017-11-16T14:52:30.000Z) > 2017-11-16T14:52:10.394Z Received Element A > [2017-11-16T14:52:00.000Z..2017-11-16T14:52:30.000Z) > 2017-11-16T14:52:10.501Z After count KV{A, 3} > [2017-11-16T14:52:00.000Z..2017-11-16T14:52:30.000Z) > 2017-11-16T14:52:10.602Z Final group by KV{A, [1, 2, 3]} > [2017-11-16T14:52:00.000Z..2017-11-16T14:52:30.000Z) > 2017-11-16T14:52:13.296Z Received Element A > [2017-11-16T14:52:00.000Z..2017-11-16T14:52:30.000Z) > 2017-11-16T14:52:13.402Z After count KV{A, 4} > [2017-11-16T14:52:00.000Z..2017-11-16T14:52:30.000Z) >
[jira] [Commented] (BEAM-3494) Snapshot state of aggregated data of apache beam project is not maintained in flink's checkpointing
[ https://issues.apache.org/jira/browse/BEAM-3494?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16401919#comment-16401919 ] Dawid Wysakowicz commented on BEAM-3494: [~suganyap] Could you elaborate a bit more how you enable checkpointing? Do I understand correctly that you pass {{checkpointingInterval}} as cli parameter and {{state.backend}} is set in flink-conf.yaml? Also could you tell more what does it mean that the aggregated data is not restored? Best with some simple example? > Snapshot state of aggregated data of apache beam project is not maintained in > flink's checkpointing > > > Key: BEAM-3494 > URL: https://issues.apache.org/jira/browse/BEAM-3494 > Project: Beam > Issue Type: Bug > Components: runner-flink >Reporter: suganya >Priority: Major > > We have a beam project which consumes events from kafka,does a groupby in a > time window(5 mins),after window elapses it pushes the events to downstream > for merge.This project is deployed using flink ,we have enabled checkpointing > to recover from failed state. > (windowsize: 5mins , checkpointingInterval: 5mins,state.backend: filesystem) > Offsets from kafka get checkpointed every 5 > mins(checkpointingInterval).Before finishing the entire DAG(groupBy and > merge) , events offsets are getting checkpointed.So incase of any restart > from task-manager ,new task gets started from last successful checkpoint ,but > we could'nt able to get the aggregated snapshot data(data from groupBy task) > from the persisted checkpoint. > Able to retrieve the last successful checkpointed offset from kafka ,but > couldnt able to get last aggregated data till checkpointing. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (BEAM-2873) Detect number of shards for file sink in Flink Streaming Runner
[ https://issues.apache.org/jira/browse/BEAM-2873?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dawid Wysakowicz reassigned BEAM-2873: -- Assignee: Dawid Wysakowicz > Detect number of shards for file sink in Flink Streaming Runner > --- > > Key: BEAM-2873 > URL: https://issues.apache.org/jira/browse/BEAM-2873 > Project: Beam > Issue Type: Improvement > Components: runner-flink >Reporter: Aljoscha Krettek >Assignee: Dawid Wysakowicz >Priority: Major > > [~reuvenlax] mentioned that this is done for the Dataflow Runner and the > default behaviour on Flink can be somewhat surprising for users. > ML entry: https://www.mail-archive.com/dev@beam.apache.org/msg02665.html: > This is how the file sink has always worked in Beam. If no sharding is > specified, then this means runner-determined sharding, and by default that is > one file per bundle. If Flink has small bundles, then I suggest using the > withNumShards method to explicitly pick the number of output shards. > The Flink runner can detect that runner-determined sharding has been chosen, > and override it with a specific number of shards. For example, the Dataflow > streaming runner (which as you mentioned also has small bundles) detects this > case and sets the number of out files shards based on the number of workers > in the worker pool > [Here|https://github.com/apache/beam/blob/9e6530adb00669b7cf0f01cb8b128be0a21fd721/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java#L354] > is the code that does this; it should be quite simple to do something > similar for Flink, and then there will be no need for users to explicitly > call withNumShards themselves. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (BEAM-3359) Unable to change "flinkMaster" from "[auto]" in TestFlinkRunner
[ https://issues.apache.org/jira/browse/BEAM-3359?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dawid Wysakowicz reassigned BEAM-3359: -- Assignee: Dawid Wysakowicz > Unable to change "flinkMaster" from "[auto]" in TestFlinkRunner > --- > > Key: BEAM-3359 > URL: https://issues.apache.org/jira/browse/BEAM-3359 > Project: Beam > Issue Type: Bug > Components: runner-flink >Reporter: Łukasz Gajowy >Assignee: Dawid Wysakowicz >Priority: Minor > > In TestFlinkRunner's constructor there is a line like this: > {{options.setFlinkMaster("\[auto\]");}} > which basically ignores any "flinkMaster" provided earlier (eg. using command > line) leading to errors that are hard to find (for example wondering: "i > provided good url in pipeline options... why is it not connecting to my > cluster?). > Setting a {{@Default.String("\[auto\]")}} in FlinkPipelineOptions could be > one solution I guess. -- This message was sent by Atlassian JIRA (v7.6.3#76005)