Thomas,

We have added the dependency in pom.xml for lafka client API and also for
malhar kafka. Please highlight if you are specifying some other dependency
that we need to add.

<dependency>
  <groupId>org.apache.apex</groupId>
  <artifactId>malhar-kafka</artifactId>
  <version>${malhar.version}</version>
  <exclusions>
    <exclusion>
      <groupId>org.apache.kafka</groupId>
      <artifactId>kafka-clients</artifactId>
    </exclusion>
  </exclusions>
</dependency>

<dependency>
  <groupId>org.apache.kafka</groupId>
  <artifactId>kafka-clients</artifactId>
  <version>0.9.0.0-mapr-1602-streams-5.1.0</version>

</dependency>


Thanks!!

On Fri, Oct 7, 2016 at 12:04 PM, Thomas Weise <t...@apache.org> wrote:

> It looks like the Kafka API dependency is missing. Can you please check it
> is part of the .apa file?
>
> To your previous question: The records/tuples/objects are moved by the
> Apex engine through the stream from operator to operator. There is nothing
> you need to do beyond connecting the operator ports with addStream when you
> specify the DAG.
>
>
> On Fri, Oct 7, 2016 at 9:01 AM, Jaspal Singh <jaspal.singh1...@gmail.com>
> wrote:
>
>> Thomas,
>>
>> Below is the operator implementation we are trying to run. This operator
>> is getting an object of tenant class from updtream operator.
>>
>> public class KafkaSinglePortExactlyOnceOutputOperator extends 
>> AbstractKafkaOutputOperator {
>>
>>     private static final Logger LOG = 
>> LoggerFactory.getLogger(KafkaSinglePortExactlyOnceOutputOperator.class);
>>
>>     public transient final DefaultInputPort<Tenant> in = new 
>> DefaultInputPort<Tenant>() {
>>
>>         Gson gson = new Gson();
>>
>>         @Override
>>         public void process(Tenant tenant) {
>>
>>             try {
>>                 Producer<String, String> producer = getKafkaProducer();
>>                 //ObjectMapper mapper = new ObjectMapper();
>>                 long now = System.currentTimeMillis();
>>                 //Configuration conf = HBaseConfiguration.create();
>>                 //TenantDao dao = new TenantDao(conf);
>>                 //ArrayList<Put> puts = new ArrayList<>();
>>                 if (tenant != null) {
>>                     //Tenant tenant = tenant.next();
>>                     if (StringUtils.isNotEmpty(tenant.getGl())) {
>>                         producer.send(new ProducerRecord<String, 
>> String>("/datalake/corporate/ses_dlpoc/stream_fabbddev/jsingh73/sample-stream:certUpdate",
>>  tenant.getVolumeName(), gson.toJson(tenant)));
>>                         //puts.add(dao.mkPut(tenant));
>>                     } else {
>>                         producer.send(new ProducerRecord<String, 
>> String>("/datalake/corporate/ses_dlpoc/stream_fabbddev/jsingh73/sample-stream:error",
>>  tenant.getVolumeName(), gson.toJson(tenant)));
>>
>>                     }
>>                     producer.flush();
>>                 }
>>             }
>>
>>
>> After building the application, it throws error during launch:
>>
>> An error occurred trying to launch the application. Server message:
>> java.lang.NoClassDefFoundError: Lkafka/javaapi/producer/Producer; at
>> java.lang.Class.getDeclaredFields0(Native Method) at
>> java.lang.Class.privateGetDeclaredFields(Class.java:2583) at
>> java.lang.Class.getDeclaredFields(Class.java:1916) at
>>
>>
>> Thanks
>> Jaspal
>>
>> On Fri, Oct 7, 2016 at 10:42 AM, Jaspal Singh <jaspal.singh1...@gmail.com
>> > wrote:
>>
>>> Thomas,
>>>
>>> I was trying to refer to the input from previous operator.
>>>
>>> Another thing when we extend the AbstractKafkaOutputOperator, do we need
>>> to specify <String, T> ? Since we are getting an object of class type from
>>> previous operator.
>>>
>>>
>>> Thanks
>>> Jaspal
>>>
>>> On Fri, Oct 7, 2016 at 10:12 AM, Thomas Weise <t...@apache.org> wrote:
>>>
>>>> Are you referring to the upstream operator in the DAG or the state of
>>>> the previous application after relaunch? Since the data is stored in MapR
>>>> streams, an operator that is a producer can also act as a consumer. Please
>>>> clarify your question.
>>>>
>>>>
>>>> On Fri, Oct 7, 2016 at 7:59 AM, Jaspal Singh <
>>>> jaspal.singh1...@gmail.com> wrote:
>>>>
>>>>> Hi Thomas,
>>>>>
>>>>> I have a question, so when we are using
>>>>> *KafkaSinglePortExactlyOnceOutputOperator* to write results into
>>>>> maprstream topic will it be able to read messgaes from the previous
>>>>> operator ?
>>>>>
>>>>>
>>>>> Thanks
>>>>> Jaspal
>>>>>
>>>>> On Thu, Oct 6, 2016 at 6:28 PM, Thomas Weise <t...@apache.org> wrote:
>>>>>
>>>>>> For recovery you need to set the window data manager like so:
>>>>>>
>>>>>> https://github.com/DataTorrent/examples/blob/master/tutorial
>>>>>> s/exactly-once/src/main/java/com/example/myapexapp/Applicati
>>>>>> on.java#L33
>>>>>>
>>>>>> That will also apply to stateful restart of the entire application
>>>>>> (relaunch from previous instance's checkpointed state).
>>>>>>
>>>>>> For cold restart, you would need to consider the property you mention
>>>>>> and decide what is applicable to your use case.
>>>>>>
>>>>>> Thomas
>>>>>>
>>>>>>
>>>>>> On Thu, Oct 6, 2016 at 4:16 PM, Jaspal Singh <
>>>>>> jaspal.singh1...@gmail.com> wrote:
>>>>>>
>>>>>>> Ok now I get it. Thanks for the nice explaination !!
>>>>>>>
>>>>>>> One more thing, so you mentioned about checkpointing the offset
>>>>>>> ranges to replay in same order from kafka.
>>>>>>>
>>>>>>> Is there any property we need to configure to do that? like
>>>>>>> initialOffset set to APPLICATION_OR_LATEST.
>>>>>>>
>>>>>>>
>>>>>>> Thanks
>>>>>>> Jaspal
>>>>>>>
>>>>>>>
>>>>>>> On Thursday, October 6, 2016, Thomas Weise <thomas.we...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> What you want is the effect of exactly-once output (that's why we
>>>>>>>> call it also end-to-end exactly-once). There is no such thing as
>>>>>>>> exactly-once processing in a distributed system. In this case it would 
>>>>>>>> be
>>>>>>>> rather "produce exactly-once. Upstream operators, on failure, will 
>>>>>>>> recover
>>>>>>>> to checkpointed state and re-process the stream from there. This is
>>>>>>>> at-least-once, the default behavior. Because in the input operator you 
>>>>>>>> have
>>>>>>>> configured to replay in the same order from Kafka (this is done by
>>>>>>>> checkpointing the offset ranges), the computation in the DAG is 
>>>>>>>> idempotent
>>>>>>>> and the output operator can discard the results that were already 
>>>>>>>> published
>>>>>>>> instead of producing duplicates.
>>>>>>>>
>>>>>>>> On Thu, Oct 6, 2016 at 3:57 PM, Jaspal Singh <
>>>>>>>> jaspal.singh1...@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> I think this is something called a customized operator
>>>>>>>>> implementation that is taking care of exactly once processing at 
>>>>>>>>> output.
>>>>>>>>>
>>>>>>>>> What if any previous operators fail ? How we can make sure they
>>>>>>>>> also recover using EXACTLY_ONCE processing mode ?
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Thanks
>>>>>>>>> Jaspal
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Thursday, October 6, 2016, Thomas Weise <thomas.we...@gmail.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> In that case please have a look at:
>>>>>>>>>>
>>>>>>>>>> https://github.com/apache/apex-malhar/blob/master/kafka/src/
>>>>>>>>>> main/java/org/apache/apex/malhar/kafka/KafkaSinglePortExactl
>>>>>>>>>> yOnceOutputOperator.java
>>>>>>>>>>
>>>>>>>>>> The operator will ensure that messages are not duplicated, under
>>>>>>>>>> the stated assumptions.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Thu, Oct 6, 2016 at 3:37 PM, Jaspal Singh <
>>>>>>>>>> jaspal.singh1...@gmail.com> wrote:
>>>>>>>>>>
>>>>>>>>>>> Hi Thomas,
>>>>>>>>>>>
>>>>>>>>>>> In our case we are writing the results back to maprstreams topic
>>>>>>>>>>> based on some validations.
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> Thanks
>>>>>>>>>>> Jaspal
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On Thursday, October 6, 2016, Thomas Weise <t...@apache.org>
>>>>>>>>>>> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Hi,
>>>>>>>>>>>>
>>>>>>>>>>>> which operators in your application are writing to external
>>>>>>>>>>>> systems?
>>>>>>>>>>>>
>>>>>>>>>>>> When you look at the example from the blog (
>>>>>>>>>>>> https://github.com/DataTorrent/examples/tree/master/tutoria
>>>>>>>>>>>> ls/exactly-once), there is Kafka input, which is configured to
>>>>>>>>>>>> be idempotent. The results are written to JDBC. That operator by 
>>>>>>>>>>>> itself
>>>>>>>>>>>> supports exactly-once through transactions (in conjunction with 
>>>>>>>>>>>> idempotent
>>>>>>>>>>>> input), hence there is no need to configure the processing mode at 
>>>>>>>>>>>> all.
>>>>>>>>>>>>
>>>>>>>>>>>> Thomas
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Reply via email to