Were you looking for this? :
https://mail-archives.apache.org/mod_mbox/beam-user/201605.mbox/%3ccagwr7sbr9x7voyuzjhymysb_5_tdugqwudz4sdhop4equ2g...@mail.gmail.com%3E


> Didnt cut it...Exceptions complaining "KafkaConsumer is not safe for
multi-threaded access".

btw, Thomas sent me full thread stack for this exception. I have a fix for
it here https://github.com/apache/incubator-beam/pull/290 . It will be very
helpful if you could check it works in your app.

Raghu.

On Thu, May 5, 2016 at 7:10 AM, amir bahmanyari <[email protected]> wrote:

> Hi Raghu,
> I noticed you replied to this thread yesterday regarding users getting
> affected by this Kafka version difference etc.
> And Google DataFlow libs working fine for Unbound KafkaIO etc.
> I dont see that email in my inbox anymore. I might have accidentally
> deleted it.
> Could you resend it pls? I appreciate it...
> Have a great day.
>
>
> ------------------------------
> *From:* amir bahmanyari <[email protected]>
> *To:* "[email protected]" <[email protected]>
> *Sent:* Wednesday, May 4, 2016 12:57 PM
>
> *Subject:* Re: KafkaIO Usage & Sample Code
>
> The root cause ended up to be the Kafka version in my lab.
> Kafka  server must be version 9.0+ for the KafkaIO call in the Beam app
> code to populate the PCollection object .
> Thanks Thomas so much for diagnosing that.
> Appreciate all his valuable time he spent with me offline.
>
> Status:
> *Bounded*:
> *Works *when setting either InProcessPipelineRunner or
> DirectPipelineRunner
>
> *Unbounded*:
> *Throws *different exceptions for the different above runners.
> Thomas has the different stacktraces.
>
> At the moment, I have withMaxNumRecords(100) set in my KafkaIO call.
> This causes the call to block till all the 100 records are received, and
> then makes it available to the app to consume it.
> I tried running the p.run() in a while(true) loop &
> setting withMaxNumRecords(100) so I get chunks of 100 records in multiple
> files created by TextIO.
> Didnt cut it...Exceptions complaining "KafkaConsumer is not safe for
> multi-threaded access".
>
> I know Unbounded is the ultimate way to achieve a true real-time streaming.
> Given that Unbounded is not available at the moment, is there a work
> around that makes every single record available immediately to the app?
>
> Thanks everyone again for your valuable help.
> Amir-
> ------------------------------
> *From:* Thomas Groh <[email protected]>
> *To:* [email protected]
> *Sent:* Monday, May 2, 2016 11:41 AM
> *Subject:* Re: KafkaIO Usage & Sample Code
>
> Yeah, JB has it - if that commit (
> https://github.com/apache/incubator-beam/commit/b2b77e380) is in your
> history, the call should compile correctly; if it's not, then the
> InProcessPipelineRunner doesn't implement the appropriate interface, and
> that call won't typecheck (and lead to your compilation failure) - syncing
> to a more recent version should fix the problem.
>
> On Mon, May 2, 2016 at 11:26 AM, Jean-Baptiste Onofré <[email protected]>
> wrote:
>
> Thomas meant that you have to checkout after or at this commit:
>
> git checkout b2b77e380
>
> ;)
>
> Regards
> JB
>
> On 05/02/2016 07:52 PM, amir bahmanyari wrote:
>
> Hi Dan,
> Sorry! I honestly dont know what "so long as you're on a commit after
> b2b77e380" means :)))
> If this means there is a specific jar file I need to have on  my path,
> could you point me to a link where I can get the right jar file pls?
> I appreciate i sir.
> Amir
>
>
> ------------------------------------------------------------------------
> *From:* Thomas Groh <[email protected]>
> *To:* [email protected]
> *Sent:* Monday, May 2, 2016 10:02 AM
> *Subject:* Re: KafkaIO Usage & Sample Code
>
> Your calls should work - so long as you're on a commit after b2b77e380
> (when we started implementing PipelineRunner), the
> InProcessPipelineRunner should be a valid argument to
> PipelineOptions#setRunner
>
> As an example, there's the InProcessPipelineRunnerTest
> (
> https://github.com/apache/incubator-beam/blob/b9116ac426f989af882e6df5dafc5da6c9f203d8/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessPipelineRunnerTest.java#L73
> )
>
> On Mon, May 2, 2016 at 9:43 AM, Dan Halperin <[email protected]
> <mailto:[email protected]>> wrote:
>
>     Hi Amir,
>
>     The problem is likely in using DataflowPipelineOptions.class -- this
>     is specific to the Cloud Dataflow service and the
>     DataflowPipelineRunner. Try using just "PipelineOptions".
>
>     Dan
>
>     On Mon, May 2, 2016 at 8:26 AM, amir bahmanyari <[email protected]
>     <mailto:[email protected]>> wrote:
>
>         Thanks Dan.
>         I actually had tried it before but got compilation errors at
>         setting the InProcessPipelineRunner  in the PipelineOptions
> object..
>         I appreciate it if you point me to a working sample code.
>         FYI, This is my implementation:
>         import com.google.cloud.dataflow.sdk.options.PipelineOptions;
>         import
> com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
>         DataflowPipelineOptions Myoptions =
>         PipelineOptionsFactory.create().as(DataflowPipelineOptions.class);
>         Myoptions.setRunner(InProcessPipelineRunner.class);
>
>         I cannot set runner as InProcessPipelineRunner in the last line:
>         The method setRunner(Class<? extends PipelineRunner<?>>) in the
>         type PipelineOptions is not applicable for the arguments
>         (Class<InProcessPipelineRunner>).
>         Thanks for your help.
>         Amir-
>
>
>
> ------------------------------------------------------------------------
>         *From:* Dan Halperin <[email protected]
>         <mailto:[email protected]>>
>         *To:* [email protected]
>         <mailto:[email protected]>
>         *Sent:* Monday, May 2, 2016 12:23 AM
>         *Subject:* Re: KafkaIO Usage & Sample Code
>
>         On Mon, May 2, 2016 at 12:22 AM, Dan Halperin
>         <[email protected] <mailto:[email protected]>> wrote:
>
>             Hi Amir,
>
>             As Frances suggested, you can use the
>             InProcessPipelineRunner instead of the DirectPipelineRunner
>             to execute your pipeline. (They're both in the codebase,
>             it's just that the Direct runner is the default. Use the
>             --runner command line option.)
>
>
>         Amending: it is relatively unlikely that the issues that we
>         caught in testing would affect you. So it should be safe for
>         your use case to do this -- and definitely safe to at least try
>         it out!
>
>
>             Dan
>
>             On Mon, May 2, 2016 at 12:17 AM, Amir Bahmanyari
>             <[email protected] <mailto:[email protected]>> wrote:
>
>                 Thanks gents
>                 What are our options in the meanwhile?
>                 Cheers
>
>                 Sent from my iPhone
>
>                 On May 2, 2016, at 12:00 AM, Dan Halperin
>                 <[email protected] <mailto:[email protected]>> wrote:
>
>                 On Sun, May 1, 2016 at 10:29 PM, Jean-Baptiste Onofré
>                 <[email protected] <mailto:[email protected]>> wrote:
>
>                     Oh, thanks Frances.
>
>                     I mixed DirectPipelineRunner ("old" local runner),
>                     and InProcessPipelineRunner ("new" local runner) ;)
>
>                     We should remove the DirectPipelineRunner to avoid
>                     confusion. WDYT ?
>
>
>                 We would like to do this soon, but there are some snags.
>
>                 As a preparation step, Thomas swapped the default
>                 runner from Direct to InProcess. (#178
>                 <https://github.com/apache/incubator-beam/pull/178>)
>
>                 However, testing unfortunately exposed some issues
>                 with the InProcess runner. (Actually, I should say
>                 "fortunately" because the tests caught it! Yay!) So we
>                 had to roll it back. (#198
>                 <https://github.com/apache/incubator-beam/pull/198>)
>
>
>                 Once we improve the InProcess runner, we can re-do the
>                 default swap. After the swap, once the tests keep
>                 passing for a few days, we do indeed intend to delete
>                 the current Direct pipeline runner and replace it with
>                 the current InProcess runner.
>
>                 Dan
>
>
>                     Regards
>                     JB
>
>                     On 05/02/2016 03:12 AM, Frances Perry wrote:
>
>                         +Thomas, author of the InProcessPipelineRunner
>
>                         The DirectPipelineRunner doesn't yet support
>                         unbounded PCollections. You
>                         can try using the InProcessPipelineRunner,
>                         which is the re-write of
>                         local execution that provides support for
>                         unbounded PCollections and
>                         better checking against the Beam Model. (We'll
>                         be renaming this to the
>                         DirectPipelineRunner in the near future to
>                         avoid having both as soon as
>                         the functionality of the
>                         InProcessPipelineRunner is complete.)
>
>                         On Sun, May 1, 2016 at 4:38 PM, amir
>                         bahmanyari <[email protected]
>                         <mailto:[email protected]>
>                         <mailto:[email protected]
>
>                         <mailto:[email protected]>>> wrote:
>
>                             Hi JB,
>                             I rebuilt my code with the latest :
>                             kafka-0.1.0-incubating-20160501.070733-11.jar
>
>                         <
> https://repository.apache.org/content/groups/snapshots/org/apache/beam/kafka/0.1.0-incubating-SNAPSHOT/kafka-0.1.0-incubating-20160501.070733-11.jar
> >
>
>
> java-sdk-all-0.1.0-incubating-20160501.070453-25.jar
>
>                         <
> https://repository.apache.org/content/groups/snapshots/org/apache/beam/java-sdk-all/0.1.0-incubating-SNAPSHOT/java-sdk-all-0.1.0-incubating-20160501.070453-25.jar
> >
>
>
>                             Tried _without setting withMaxNumRecords()_:
>                             Throws java.lang.IllegalStateException: no
>                         evaluator registered for
>                             Read(UnboundedKafkaSource)
>                             at
>
>
> org.apache.beam.sdk.runners.DirectPipelineRunner$Evaluator.visitTransform(DirectPipelineRunner.java:898)
>                             at
>
>
> org.apache.beam.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:221)
>                             at
>
>
> org.apache.beam.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:217)
>
>                             _With setting ithMaxNumRecords(_), I see
>                         the thread is running, no
>                             exceptions like above, waiting for
>                         incoming Kafka data, but the
>                             method obtaining the data from
>                         processElement(ProcessContext ctx)
>                             never executes.
>                             Therefore, nothing goes into
>                         apply(TextIO.Write.to <http://textio.write.to/> <
> http://textio.write.to/>
>                             <http://TextIO.Write.to
> <http://textio.write.to/>
>                         <http://textio.write.to/
> >>("c:\\temp\\KafkaOut\\Kafkadata.txt")).
>
>                             I see Kafka Broker reports my laptop IP
>                         address as getting a
>                             connection to it, OK.
>                             Everything looks OK at the server side.
>                             Doesn't look like its my lucky day.
>                             I appreciate any help/feedback/suggetion.
>                             Cheers
>
>
>
> ------------------------------------------------------------------------
>                             *From:* Jean-Baptiste Onofré
>                         <[email protected] <mailto:[email protected]>
>                         <mailto:[email protected] <mailto:[email protected]>>>
>                             *To:* [email protected]
>                         <mailto:[email protected]>
>                             <mailto:[email protected]
>
>                         <mailto:[email protected]>>
>                             *Sent:* Friday, April 29, 2016 10:36 PM
>                             *Subject:* Re: KafkaIO Usage & Sample Code
>
>
>                             As I said in my previous e-mail, until
>                         recently DirectPipelineRunner
>                             didn't support Unbounded.
>
>                             It's now fixed, so if you take a latest
>                         nightly build, or build master,
>                             it should work.
>
>                             As workaround, you can also limit the
>                         number of message consumed from
>                             Kafka (and so work with bounded).
>
>                             Regards
>                             JB
>
>                             On 04/29/2016 07:12 PM, amir bahmanyari wrote:
>                              > Hi colleagues,
>                              > I am moving this conversation to this
>                         users mailing list as per Max’s
>                              > suggestion.
>                              > Thanks Max.
>                              > Hi JB,
>                              > Hope all is great.
>                              > Is there a resolution to the exception
>                         I sent last night pls?
>                              > When would the sample code to use
>                         KafkaIO be released?
>                              > I really appreciate your valuable time.
>                         Below is the exception
>                             for your
>                              > reference.
>                              > This is how it gets used in my code:
>                              >
>                              >
>
>
> p.apply(KafkaIO.read().withBootstrapServers("kirk:9092").withTopics(topics));
>                              >
>                              > Have a wonderful weekend.
>                              > Exception in thread "main"
>                         java.lang.IllegalStateException: no
>                             evaluator
>                              > registered for Read(UnboundedKafkaSource)
>                              >        at
>                              >
>
>
> org.apache.beam.sdk.runners.DirectPipelineRunner$Evaluator.visitTransform(DirectPipelineRunner.java:898)
>                              >        at
>                              >
>
>
> org.apache.beam.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:221)
>                              >        at
>                              >
>
>
> org.apache.beam.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:217)
>                              >        at
>                              >
>
>
> org.apache.beam.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:217)
>                              >        at
>                              >
>
>
> org.apache.beam.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:104)
>                              >        at
>                              >
>
> org.apache.beam.sdk.Pipeline.traverseTopologically(Pipeline.java:261)
>                              >        at
>                              >
>
>
> org.apache.beam.sdk.runners.DirectPipelineRunner$Evaluator.run(DirectPipelineRunner.java:860)
>                              >        at
>                              >
>
>
> org.apache.beam.sdk.runners.DirectPipelineRunner.run(DirectPipelineRunner.java:572)
>                              >        at
>                              >
>
>
> org.apache.beam.sdk.runners.DirectPipelineRunner.run(DirectPipelineRunner.java:106)
>                              >        at
>                         org.apache.beam.sdk.Pipeline.run(Pipeline.java:182)
>                              >        at
>                              >
>
>
> benchmark.flinkspark.flink.ReadFromKafka2.main(ReadFromKafka2.java:286)
>                              > Kind Regards,
>                              > Amir
>
>
>                             --
>                             Jean-Baptiste Onofré
>                         [email protected]
>                         <mailto:[email protected]>
>                         <mailto:[email protected]
>                         <mailto:[email protected]>>
>                         http://blog.nanthrax.net
>                         <http://blog.nanthrax.net/>
>                         <http://blog.nanthrax.net/>
>                             Talend - http://www.talend.com
>                         <http://www.talend.com/> <http://www.talend.com/>
>
>
>
>
>
>                     --
>                     Jean-Baptiste Onofré
>                     [email protected] <mailto:[email protected]>
>                     http://blog.nanthrax.net <http://blog.nanthrax.net/>
>                     Talend - http://www.talend.com
>                     <http://www.talend.com/>
>
>
>
>
>
>
>
>
>
>
>
> --
> Jean-Baptiste Onofré
> [email protected]
> http://blog.nanthrax.net
> Talend - http://www.talend.com
>
>
>
>
>
>
>

Reply via email to