go使用es

elasticsearch有官方的golang驱动go-elasticsearch这个项目比较新, 另外一个常用的是 elastic,这两个驱动文档和demo都比较少。es的查询语法也相对复杂,很多查询方式去翻翻它们的test文件才能发现方式。本小节使用elastic做演示,注意不同elasticsearch版本对于不同的client版本,例如elasticsearch 5.5.3对应的client版本为gopkg.in/olivere/elastic.v5。如果这个对应关系错误,很可能程序会出错,这个在https://github.com/olivere/elastic的readme文档也有介绍。 本小节的demo主要基于 elasticsearch 5.5.3,client为gopkg.in/olivere/elastic.v5

go链接es

var client *elastic.Client func init() { var err error client, err = elastic.NewClient(elastic.SetURL("http://localhost:9200")) if err != nil { log.Fatal(err) } }

CURD

测试数据结构体

type Item struct { Id int64 `json:"id"` Appid string `json:"appid"` AppBAutoId string `json:"app_b_auto_id"` }

添加

func add() { //add one item := Item{Id:int64(21),Appid:fmt.Sprintf("app_%d",21),AppBAutoId:fmt.Sprintf("app_%d",21+200)} put, err := client.Index(). Index("es_test"). Type("test"). Id("1"). //这个id也可以指定,不指定的话 es自动生成一个 BodyJson(item). Do(context.Background()) if err != nil { // Handle error panic(err) } fmt.Println(put) //add many bulkRequest := client.Bulk() for i:=0;i<20;i++ { item := Item{Id:int64(i),Appid:fmt.Sprintf("app_%d",i),AppBAutoId:fmt.Sprintf("app_%d",i+200)} bulkRequest.Add(elastic.NewBulkIndexRequest().Index("es_test").Type("test").Doc(item)) } bulkRequest.Do(context.TODO()) }

查找

func find() { //find one get1, err := client.Get(). Index("es_test"). Type("test"). Id("1"). Do(context.Background()) if err != nil { // Handle error panic(err) } if get1.Found { fmt.Printf("Got document %s in version %d from index %s, type %s\n", get1.Id, get1.Version, get1.Index, get1.Type) } var ttyp Item json.Unmarshal(*get1.Source,&ttyp) fmt.Println("item",ttyp) //find many searchResult, err := client.Search(). Index("es_test"). Sort("id", true). Type("test").From(0).Size(100). Do(context.TODO()) if err != nil { panic(err) } if searchResult.Hits.TotalHits >0 { var ttyp Item for _, item := range searchResult.Each(reflect.TypeOf(ttyp)) { t := item.(Item) fmt.Println("item ",t) } } }

更新

func update() { fmt.Println(client.Update().Index("es_test").Type("test").Id("1"). Doc(map[string]interface{}{"appid": "app_23"}).Do(context.TODO())) }

删除

func delete() { fmt.Println(client.Delete().Index("es_test").Type("test").Id("1").Do(context.TODO())) }

统计查询

func agg() { //获取最大的id searchResult, err := client.Search(). Index("es_test").Type("test"). Aggregation("max_id", elastic.NewMaxAggregation().Field("id")).Size(0).Do(context.TODO()) if err != nil { panic(err) } var a map[string]float32 if searchResult != nil { if v, found := searchResult.Aggregations["max_id"]; found { json.Unmarshal([]byte(*v), &a) fmt.Println(a) } } //统计id相同的文档数 searchResult, err = client.Search(). Index("es_test").Type("test"). Aggregation("count", elastic.NewTermsAggregation().Field("id")).Size(0).Do(context.TODO()) if err != nil { panic(err) } if searchResult != nil { if v, found := searchResult.Aggregations["count"]; found { var ar elastic.AggregationBucketKeyItems err := json.Unmarshal(*v, &ar) if err != nil { fmt.Printf("Unmarshal failed: %v\n", err) return } for _, item := range ar.Buckets { fmt.Printf("id :%v: count :%v\n", item.Key, item.DocCount) } } } }

参考资料