Class: Xfers::Etcd::Client

Inherits:
Object
  • Object
show all
Includes:
MonitorMixin
Defined in:
lib/xfers/etcd/client.rb

Overview

Xfers etcd v3 client class, it wraps etcdv3-ruby gem and provides some advanced feature like: Mutex, #watch_forever...etc

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(**options) ⇒ Client

Create a new etcd client connection

Parameters:

  • endpoints (String)

    the etcd server endpoints, seperated by commas

  • allow_reconnect (Boolean)

    allow to reconnect, defaults to true

  • command_timeout (Integer)

    the default global timeout, unit is second, defaults to 120

  • user (String)

    the user name for authentication (if RBAC enabled on server side)

  • password (String)

    the user password for authentication (if RBAC enabled on server side)



24
25
26
27
28
# File 'lib/xfers/etcd/client.rb', line 24

def initialize(**options)
  @client = ::Etcdv3.new(options)

  super() # Monitor#initialize
end

Instance Attribute Details

#clientEtcdv3 (readonly)

Raw Etcdv3 instance

Returns:

  • (Etcdv3)


13
14
15
# File 'lib/xfers/etcd/client.rb', line 13

def client
  @client
end

Instance Method Details

#db_sizeInteger

Store size in bytes.

Returns:

  • (Integer)

    the size of the backend database, in bytes.



40
41
42
# File 'lib/xfers/etcd/client.rb', line 40

def db_size
  synchronize(&:db_size)
end

#del(key) ⇒ Integer

Delete a single key

Parameters:

  • key (String)

    key to set

Returns:

  • (Integer)

    number of keys deleted



170
171
172
173
174
175
# File 'lib/xfers/etcd/client.rb', line 170

def del(key)
  synchronize do |client|
    self.class.valid_string_argument?("key", key)
    client.del(key, {}).deleted
  end
end

#del_allInteger

Delete all keys

Returns:

  • (Integer)

    number of keys deleted



180
181
182
183
184
# File 'lib/xfers/etcd/client.rb', line 180

def del_all
  synchronize do |client|
    client.del("\0", range_end: "\0").deleted
  end
end

#del_prefix(key_prefix) ⇒ Integer

Delete a range of keys with a prefix

Parameters:

  • key_prefix (String)

    first key in range

Returns:

  • (Integer)

    number of keys deleted



191
192
193
194
195
196
197
198
199
# File 'lib/xfers/etcd/client.rb', line 191

def del_prefix(key_prefix)
  synchronize do |client|
    self.class.valid_string_argument?("key_prefix", key_prefix)
    options = {
      range_end: prefix_range_end(key_prefix),
    }
    client.del(key_prefix, options).deleted
  end
end

#del_range(range_start, range_end) ⇒ Integer

Delete a range of keys

Parameters:

  • range_start (String)

    first key in range

  • range_end (String)

    last key in range, exclusive

Returns:

  • (Integer)

    number of keys deleted



207
208
209
210
211
212
213
214
215
216
# File 'lib/xfers/etcd/client.rb', line 207

def del_range(range_start, range_end)
  synchronize do |client|
    self.class.valid_string_argument?("range_start", range_start)
    self.class.valid_string_argument?("range_end", range_end)
    options = {
      range_end: range_end,
    }
    client.del(range_start, options).deleted
  end
end

#exist?(key) ⇒ Boolean

Check if a key exist

Parameters:

  • key (String)

    key

Returns:

  • (Boolean)

    key exists or not



49
50
51
52
53
54
55
# File 'lib/xfers/etcd/client.rb', line 49

def exist?(key)
  synchronize do |client|
    self.class.valid_string_argument?("key", key)
    count = client.get(key, { count_only: true })&.count
    !count.nil? && count > 0
  end
end

#get(key) ⇒ Mvccpb::KeyValue

Get the key-value object of a key from etcd

Parameters:

  • key (String)

    key to get

Returns:

  • (Mvccpb::KeyValue)

    the value of key



62
63
64
65
66
67
# File 'lib/xfers/etcd/client.rb', line 62

def get(key)
  synchronize do |client|
    self.class.valid_string_argument?("key", key)
    client.get(key).kvs.first
  end
end

#get_allArray<Mvccpb::KeyValue>

Get all key-value objects currently stored

Returns:

  • (Array<Mvccpb::KeyValue>)

    sequence of key-value object



72
73
74
75
76
# File 'lib/xfers/etcd/client.rb', line 72

def get_all
  synchronize do |client|
    client.get("\0", range_end: "\0").kvs
  end
end

#get_prefix(key_prefix, keys_only: false, sort_target: :key, sort_order: :none) ⇒ Array<Mvccpb::KeyValue>

Get a range of key-value objects with the prefix

Parameters:

  • key_prefix (String)

    the key prefix

  • keys_only (Boolean) (defaults to: false)

    returns only the keys.

  • sort_target (Symbol) (defaults to: :key)

    sort target, possible values: :key, :version, :create, :mode, :value

  • sort_order (Symbol) (defaults to: :none)

    sort order, possible values: :none, :ascend, :descend

Returns:

  • (Array<Mvccpb::KeyValue>)

    sequence of key-value objects



86
87
88
89
90
91
92
93
94
95
96
97
# File 'lib/xfers/etcd/client.rb', line 86

def get_prefix(key_prefix, keys_only: false, sort_target: :key, sort_order: :none)
  synchronize do |client|
    self.class.valid_string_argument?("key_prefix", key_prefix)
    options = {
      range_end: prefix_range_end(key_prefix),
      keys_only: keys_only,
      sort_target: sort_target,
      sort_order: sort_order,
    }
    client.get(key_prefix, options).kvs
  end
end

#get_prefix_count(key_prefix) ⇒ Integer

Get the count of keys with the prefix

Parameters:

  • key_prefix (String)

    the key prefix

Returns:

  • (Integer)

    the count the count of keys with the prefix



103
104
105
106
107
108
# File 'lib/xfers/etcd/client.rb', line 103

def get_prefix_count(key_prefix)
  synchronize do |client|
    self.class.valid_string_argument?("key_prefix", key_prefix)
    client.get(key_prefix, { range_end: prefix_range_end(key_prefix), count_only: true }).count
  end
end

#get_range(range_start, range_end, keys_only: false, sort_target: :key, sort_order: :none) ⇒ Array<Mvccpb::KeyValue>

Get a range of key-value objects

Parameters:

  • range_start (String)

    first key in range

  • range_end (String)

    last key in range, exclusive

  • keys_only (Boolean) (defaults to: false)

    returns only the keys.

  • sort_target (Symbol) (defaults to: :key)

    sort target, possible values: :key, :version, :create, :mode, :value

  • sort_order (Symbol) (defaults to: :none)

    sort order, possible values: :none, :ascend, :descend

Returns:

  • (Array<Mvccpb::KeyValue>)

    sequence of key-value objects



119
120
121
122
123
124
125
126
127
128
129
130
131
# File 'lib/xfers/etcd/client.rb', line 119

def get_range(range_start, range_end, keys_only: false, sort_target: :key, sort_order: :none)
  synchronize do |client|
    self.class.valid_string_argument?("range_start", range_start)
    self.class.valid_string_argument?("range_end", range_end)
    options = {
      range_end: range_end,
      keys_only: keys_only,
      sort_target: sort_target,
      sort_order: sort_order,
    }
    client.get(range_start, options).kvs
  end
end

#get_range_count(range_start, range_end) ⇒ Integer

Get the count of key range

Parameters:

  • range_start (String)

    first key in range

  • range_end (String)

    last key in range, exclusive

Returns:

  • (Integer)

    the count of key range



138
139
140
141
142
143
144
# File 'lib/xfers/etcd/client.rb', line 138

def get_range_count(range_start, range_end)
  synchronize do |client|
    self.class.valid_string_argument?("range_start", range_start)
    self.class.valid_string_argument?("range_end", range_end)
    client.get(range_start, { range_end: range_end, count_only: true }).count
  end
end

#lease_grant(ttl) ⇒ Integer

Create a new lease

Parameters:

  • ttl (Integer)

    the advisory time-to-live in seconds

Returns:

  • (Integer)

    lease id



223
224
225
226
227
228
# File 'lib/xfers/etcd/client.rb', line 223

def lease_grant(ttl)
  synchronize do |client|
    self.class.valid_ttl?(ttl)
    client.lease_grant(ttl)["ID"]
  end
end

#lease_keep_alive_once(lease_id) ⇒ Integer

Keep the lease alive

Parameters:

  • lease_id (Integer)

    the lease ID for the lease

Returns:

  • (Integer)

    the remaining TTL in seconds for the lease



260
261
262
263
264
265
# File 'lib/xfers/etcd/client.rb', line 260

def lease_keep_alive_once(lease_id)
  synchronize do |client|
    self.class.valid_lease_id?(lease_id)
    client.lease_keep_alive_once(lease_id)["TTL"]
  end
end

#lease_revoke(lease_id)

This method returns an undefined value.

Revoke a lease

Parameters:

  • lease_id (Integer)

    the lease ID for the lease



235
236
237
238
239
240
241
# File 'lib/xfers/etcd/client.rb', line 235

def lease_revoke(lease_id)
  synchronize do |client|
    self.class.valid_lease_id?(lease_id)
    client.lease_revoke(lease_id)
    nil
  end
end

#lease_ttl(lease_id) ⇒ Integer

Query the TTL of lease

Parameters:

  • lease_id (Integer)

    the lease ID for the lease

Returns:

  • (Integer)

    the remaining TTL in seconds for the lease



248
249
250
251
252
253
# File 'lib/xfers/etcd/client.rb', line 248

def lease_ttl(lease_id)
  synchronize do |client|
    self.class.valid_lease_id?(lease_id)
    client.lease_ttl(lease_id)["TTL"]
  end
end

#mutex_new(name, ttl: 60) ⇒ Xfers::Etcd::Mutex

Create a new mutex instance

Parameters:

  • name (String)

    the mutex name, the same name of different mutes instance will be treat the same

  • ttl (Integer) (defaults to: 60)

    the time-to-live in seconds of this mutex when mutex lock perform, the lock will be released after this time elapses, unless refreshed.

Returns:



274
275
276
277
278
# File 'lib/xfers/etcd/client.rb', line 274

def mutex_new(name, ttl: 60)
  self.class.valid_string_argument?("name", name)
  self.class.valid_ttl?(ttl)
  Mutex.new(name, ttl: ttl, conn: self)
end

#put(key, value, ttl: nil, lease: nil)

This method returns an undefined value.

Set a value to specific key

Parameters:

  • key (String)

    key name

  • value (String)

    the value of key

  • ttl (Integer) (defaults to: nil)

    the advisory time-to-live in seconds, defaults to live forever

  • lease (Integer) (defaults to: nil)

    lease attached to the key, if ttl and lease are both provided, it will use ttl



154
155
156
157
158
159
160
161
162
163
# File 'lib/xfers/etcd/client.rb', line 154

def put(key, value, ttl: nil, lease: nil)
  synchronize do |client|
    self.class.valid_string_argument?("key", key)
    options = {}
    options[:lease] = lease unless lease.nil?
    options[:lease] = self.lease_grant(ttl) if ttl&.respond_to?(:to_i)
    client.put(key, value.to_s, options)
    nil
  end
end

#transaction(timeout: nil) {|txn| ... } ⇒ Etcdserverpb::TxnResponse

Transaction

Yield Parameters:

Returns:

  • (Etcdserverpb::TxnResponse)


358
359
360
361
362
# File 'lib/xfers/etcd/client.rb', line 358

def transaction(timeout: nil, &block)
  synchronize do |client|
    client.transaction(timeout: timeout, &block)
  end
end

#versionString

Version of Etcd running on member

Returns:

  • (String)

    version string



33
34
35
# File 'lib/xfers/etcd/client.rb', line 33

def version
  synchronize(&:version)
end

#watch(key, timeout: nil, start_revision: nil) {|events| ... } ⇒ Array<Mvccpb::Event>, Nil

Watch for changes on a key

Parameters:

  • key (String)

    the key to register for watching

  • timeout (Integer) (defaults to: nil)

    the most waiting time in seconds, defaults to :command_timeout options

  • start_revision (Integer) (defaults to: nil)

    the revision for where to inclusively begin watching

Yield Parameters:

  • events (Array<Mvccpb::Event>)

    a list of new events in sequence

Returns:

  • (Array<Mvccpb::Event>, Nil)

    return events when event arrives if no given block, Nil when timed out or has given block



289
290
291
292
293
294
295
296
# File 'lib/xfers/etcd/client.rb', line 289

def watch(key, timeout: nil, start_revision: nil, &block)
  synchronize do |client|
    self.class.valid_string_argument?("key", key)
    client.watch(key, timeout: timeout, start_revision: start_revision, &block)
  rescue GRPC::DeadlineExceeded
    nil
  end
end

#watch_forever(key, start_revision: nil) {|events| ... }

This method returns an undefined value.

Watch forever for a key changes

Parameters:

  • key (String)

    the key to register for watching

  • start_revision (Integer) (defaults to: nil)

    the revision for where to inclusively begin watching

Yield Parameters:

  • events (Array<Mvccpb::Event>)

    a list of new events in sequence



306
307
308
309
310
311
312
313
314
315
316
# File 'lib/xfers/etcd/client.rb', line 306

def watch_forever(key, start_revision: nil, &block)
  synchronize do |client|
    self.class.valid_string_argument?("key", key)
    loop do
      client.watch(key, timeout: 60, start_revision: start_revision, &block)
    rescue GRPC::DeadlineExceeded
      next
    end
  end
  nil
end

#watch_prefix(key_prefix, timeout: nil, start_revision: nil) {|events| ... } ⇒ Array<Mvccpb::Event>, Nil

Watch for changes on a specified key prefix

Parameters:

  • key_prefix (String)

    first key in range

  • timeout (Integer) (defaults to: nil)

    the most waiting time in seconds, defaults to :command_timeout options

  • start_revision (Integer) (defaults to: nil)

    the revision for where to inclusively begin watching

Yield Parameters:

  • events (Array<Mvccpb::Event>)

    a list of new events in sequence

Returns:

  • (Array<Mvccpb::Event>, Nil)

    return events when event arrives if no given block, Nil when timed out or has given block



327
328
329
330
331
332
333
334
# File 'lib/xfers/etcd/client.rb', line 327

def watch_prefix(key_prefix, timeout: nil, start_revision: nil, &block)
  synchronize do |client|
    self.class.valid_string_argument?("key_prefix", key_prefix)
    client.watch(key_prefix, range_end: prefix_range_end(key_prefix), timeout: timeout, start_revision: start_revision, &block)
  rescue GRPC::DeadlineExceeded
    nil
  end
end

#watch_prefix_forever(key_prefix, start_revision: nil, &block)

This method returns an undefined value.

Watch forever for changes on a specified key prefix

Parameters:

  • key (String)

    the key to register for watching

  • start_revision (Integer) (defaults to: nil)

    the revision for where to inclusively begin watching



342
343
344
345
346
347
348
349
350
351
# File 'lib/xfers/etcd/client.rb', line 342

def watch_prefix_forever(key_prefix, start_revision: nil, &block)
  synchronize do |client|
    self.class.valid_string_argument?("key_prefix", key_prefix)
    loop do
      client.watch(key_prefix, range_end: prefix_range_end(key_prefix), timeout: 120, start_revision: start_revision, &block)
    rescue GRPC::DeadlineExceeded
      next
    end
  end
end