Regarding the 'unknown' error, it typically appears if there are no fields defined on the sink/source tap. Without any field definition, we can't extract the field value.
We could use numbers but since the tuple order can easily change, is 
better/safer to use the names.

Regarding configuration clashes, we could introduce a dedicated Properties object per Tap or potentially a var arg (for lose declaration) - unsure yet on how this will translate to the configuration of the running job (as a Tap/Input/Output format we can't touch the configuration itself since we're called after the 'job' has already started; any changes that we make are local).

Can you please raise an issue to make sure we keep track of this?

If you can't fix the 'unknown' problem, please try and translate it into proper Cascading and raise an issue for it as well.

Thanks!

On 21/02/2014 1:46 PM, Antonios Chalkiopoulos wrote:
I'm using *1.3.0.M2 & Scalding 0.9.0rc4*

I'm overriding the config object and set the es.mapping.id to "number"

import com.twitter.scalding.{Job, Args}
import org.elasticsearch.hadoop.cfg.ConfigurationOptions

class JobBase(args: Args) extends Job(args) {
   // Overide JobConfig
   override def config : Map[AnyRef,AnyRef] = {
     super.config ++ Map (ConfigurationOptions.ES_MAPPING_ID -> "number") ++ 
Map(ConfigurationOptions.ES_WRITE_OPERATION
-> "index")
   }
}

My Scalding Job looks like that

class ElasticSearchUpdateIndexes(args: Args) extends JobBase(args) {

   // Some data to push into elastic-search
   val someData = List(
     ("1","product1", "description1"),
     ("2","product2", "description2"),
     ("3","product3", "description3"))

   val indexNewDataInElasticSearch =
     IterableSource[(String,String, String)](someData, ('number, 'product, 
'description))
       .write(ElasticSearchSource("localhost", 9200,"index_es/type_es"))
}

And the wrapper that i'm trying to implement and contribute to Scalding for the 
ElasticSearchSource currently looks like
this:

case class ElasticSearchSource(
    es_host :String="localhost",
    es_port :Int   = 9200,
    es_resource:String="scalding_index/type",
    es_fields : Fields = Fields.ALL)
   extends Source {

   def createLocalTap: Tap[_, _, _] =
     new EsTap(es_host, es_port, es_resource,"",es_fields)

   override def createTap(readOrWrite: AccessMode)(implicit mode: Mode): Tap[_, 
_, _] = {
     mode match {
       case Local(_) => {
         createLocalTap
       }
     }
   }
}

My problem is that once i introduce :

++ Map (ConfigurationOptions.ES_MAPPING_ID -> "number")

I'm getting

cascading.tuple.TupleException: unable to sink into output identifier: 'unknown'

My second concern is around the usage of the ConfigurationOptions.ES_MAPPING_ID 
as part of the JobConfiguration
I understand the benefits of that approach , for Hive/Pig, but I think for 
Cascading/Scalding - having a single property
is inefficient

What i would ideally be able to do in Scalding is the following:

val productSales = data
    .filterProductsBoughtWithOffer("summer-offer-14")
    .project('productID, 'customerID, 'quantity)
    .write(ElasticSearchSource("localhost", 9200,"offers/summer-offer-14"), 
'productID)  // productID is the em.mapping.id
    .joinWithSmaller('customerID -> 'customerID, customerData)
    .write(ElasticSearchSource("localhost", 9200,"customers/got-offer"), 
'customerID)    // customerID is the em.mapping.id

So i would like within a Single Job to have multiple elastic-search sources & 
sinks.
My understanding at the moment is that elasticsearch-hadoop will not allow me 
to configure all sources..

Anyhow i'm just looking for some help in implementing this capability in 
Scalding ..
Any help appreciated

Antonios

--
You received this message because you are subscribed to the Google Groups 
"elasticsearch" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to
elasticsearch+unsubscr...@googlegroups.com.
To view this discussion on the web visit
https://groups.google.com/d/msgid/elasticsearch/2e78f7d6-7142-4225-8afa-69f29e509048%40googlegroups.com.
For more options, visit https://groups.google.com/groups/opt_out.

--
Costin

--
You received this message because you are subscribed to the Google Groups 
"elasticsearch" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to elasticsearch+unsubscr...@googlegroups.com.
To view this discussion on the web visit 
https://groups.google.com/d/msgid/elasticsearch/53073FC2.3060104%40gmail.com.
For more options, visit https://groups.google.com/groups/opt_out.

Reply via email to