go-elasticsearch: Elastic官方的Go语言客户端(29)

发布于2019-04-21 20:34:12

说明

Elastic官方鼓励在项目中尝试用这个包,但请记住以下几点:

安装

go get安装这个包:

go get -u github.com/elastic/go-elasticsearch

或者将这个包添加到go.mod文件:

require github.com/elastic/go-elasticsearch v0.0.0

或者克隆这个仓库:

git clone https://github.com/elastic/go-elasticsearch.git && cd go-elasticsearch

一个完整的示例:

mkdir my-elasticsearch-app && cd my-elasticsearch-app

cat > go.mod <<-END
  module my-elasticsearch-app

  require github.com/elastic/go-elasticsearch v0.0.0
END

cat > main.go <<-END
  package main

  import (
    "log"

    "github.com/elastic/go-elasticsearch"
  )

  func main() {
    es, _ := elasticsearch.NewDefaultClient()
    log.Println(es.Info())
  }
END

go run main.go

用法

elasticsearch包与另外两个包绑定在一起,esapi用于调用Elasticsearch的API,estransport通过HTTP传输数据。

使用elasticsearch.NewDefaultClient()函数创建带有以下默认设置的客户端:

es, err := elasticsearch.NewDefaultClient()
if err != nil {
  log.Fatalf("Error creating the client: %s", err)
}

res, err := es.Info()
if err != nil {
  log.Fatalf("Error getting response: %s", err)
}

log.Println(res)

// [200 OK] {
//   "name" : "node-1",
//   "cluster_name" : "go-elasticsearch"
// ...

注意:当导出ELASTICSEARCH_URL环境变量时,它将被用作集群端点。

使用elasticsearch.NewClient()函数(仅用作演示)配置该客户端:

cfg := elasticsearch.Config{
  Addresses: []string{
    "http://localhost:9200",
    "http://localhost:9201",
  },
  Transport: &http.Transport{
    MaxIdleConnsPerHost:   10,
    ResponseHeaderTimeout: time.Second,
    DialContext:           (&net.Dialer{Timeout: time.Second}).DialContext,
    TLSClientConfig: &tls.Config{
      MaxVersion:         tls.VersionTLS11,
      InsecureSkipVerify: true,
    },
  },
}

es, err := elasticsearch.NewClient(cfg)
// ...

下面的示例展示了更复杂的用法。它从集群中获取Elasticsearch版本,同时索引几个文档,并使用响应主体周围的一个轻量包装器打印搜索结果。

// $ go run _examples/main.go

package main

import (
  "context"
  "encoding/json"
  "log"
  "strconv"
  "strings"
  "sync"

  "github.com/elastic/go-elasticsearch"
  "github.com/elastic/go-elasticsearch/esapi"
)

func main() {
  log.SetFlags(0)

  var (
    r  map[string]interface{}
    wg sync.WaitGroup
  )

  // Initialize a client with the default settings.
  //
  // An `ELASTICSEARCH_URL` environment variable will be used when exported.
  //
  es, err := elasticsearch.NewDefaultClient()
  if err != nil {
    log.Fatalf("Error creating the client: %s", err)
  }

  // 1. Get cluster info
  //
  res, err := es.Info()
  if err != nil {
    log.Fatalf("Error getting response: %s", err)
  }
  // Deserialize the response into a map.
  if err := json.NewDecoder(res.Body).Decode(&r); err != nil {
    log.Fatalf("Error parsing the response body: %s", err)
  }
  // Print version number.
  log.Printf("~~~~~~~> Elasticsearch %s", r["version"].(map[string]interface{})["number"])

  // 2. Index documents concurrently
  //
  for i, title := range []string{"Test One", "Test Two"} {
    wg.Add(1)

    go func(i int, title string) {
      defer wg.Done()

      // Set up the request object directly.
      req := esapi.IndexRequest{
        Index:      "test",
        DocumentID: strconv.Itoa(i + 1),
        Body:       strings.NewReader(`{"title" : "` + title + `"}`),
        Refresh:    "true",
      }

      // Perform the request with the client.
      res, err := req.Do(context.Background(), es)
      if err != nil {
        log.Fatalf("Error getting response: %s", err)
      }
      defer res.Body.Close()

      if res.IsError() {
        log.Printf("[%s] Error indexing document ID=%d", res.Status(), i+1)
      } else {
        // Deserialize the response into a map.
        var r map[string]interface{}
        if err := json.NewDecoder(res.Body).Decode(&r); err != nil {
          log.Printf("Error parsing the response body: %s", err)
        } else {
          // Print the response status and indexed document version.
          log.Printf("[%s] %s; version=%d", res.Status(), r["result"], int(r["_version"].(float64)))
        }
      }
    }(i, title)
  }
  wg.Wait()

  log.Println(strings.Repeat("-", 37))

  // 3. Search for the indexed documents
  //
  // Use the helper methods of the client.
  res, err = es.Search(
    es.Search.WithContext(context.Background()),
    es.Search.WithIndex("test"),
    es.Search.WithBody(strings.NewReader(`{"query" : { "match" : { "title" : "test" } }}`)),
    es.Search.WithTrackTotalHits(true),
    es.Search.WithPretty(),
  )
  if err != nil {
    log.Fatalf("ERROR: %s", err)
  }
  defer res.Body.Close()

  if res.IsError() {
    var e map[string]interface{}
    if err := json.NewDecoder(res.Body).Decode(&e); err != nil {
      log.Fatalf("error parsing the response body: %s", err)
    } else {
      // Print the response status and error information.
      log.Fatalf("[%s] %s: %s",
        res.Status(),
        e["error"].(map[string]interface{})["type"],
        e["error"].(map[string]interface{})["reason"],
      )
    }
  }

  if err := json.NewDecoder(res.Body).Decode(&r); err != nil {
    log.Fatalf("Error parsing the response body: %s", err)
  }
  // Print the response status, number of results, and request duration.
  log.Printf(
    "[%s] %d hits; took: %dms",
    res.Status(),
    int(r["hits"].(map[string]interface{})["total"].(map[string]interface{})["value"].(float64)),
    int(r["took"].(float64)),
  )
  // Print the ID and document source for each hit.
  for _, hit := range r["hits"].(map[string]interface{})["hits"].([]interface{}) {
    log.Printf(" * ID=%s, %s", hit.(map[string]interface{})["_id"], hit.(map[string]interface{})["_source"])
  }

  log.Println(strings.Repeat("=", 37))
}

// ~~~~~~~> Elasticsearch 7.0.0-SNAPSHOT
// [200 OK] updated; version=1
// [200 OK] updated; version=1
// -------------------------------------
// [200 OK] 2 hits; took: 7ms
//  * ID=1, map[title:Test One]
//  * ID=2, map[title:Test Two]
// =====================================

如上述示例所示,esapi包允许通过两种不同的方式调用Elasticsearch API:通过创建结构(如IndexRequest),并向其传递上下文和客户端来调用其Do()方法,或者通过客户端上可用的函数(如WithIndex())直接调用其上的Search()函数。更多信息请参阅包文档。

estransport包处理与Elasticsearch之间的数据传输。 目前,这个实现只占据很小的空间:它只在已配置的集群端点上进行循环。后续将添加更多功能:重试失败的请求,忽略某些状态代码,自动发现群集中的节点等等。

Examples

_examples文件夹包含许多全面的示例,可帮助你上手使用客户端,包括客户端的配置和自定义,模拟单元测试的传输,将客户端嵌入自定义类型,构建查询,执行请求和解析回应。

许可证

遵循Apache License 2.0版本。

参考链接:

https://github.com/elastic/go-elasticsearch#go-elasticsearch