Class | MCollective::Client |
In: |
lib/mcollective/client.rb
|
Parent: | Object |
Helpers for writing clients that can talk to agents, do discovery and so forth
options | [RW] | |
stats | [RW] |
# File lib/mcollective/client.rb, line 6 6: def initialize(configfile) 7: @config = Config.instance 8: @config.loadconfig(configfile) unless @config.configured 9: 10: @connection = PluginManager["connector_plugin"] 11: @security = PluginManager["security_plugin"] 12: 13: @security.initiated_by = :client 14: @options = nil 15: @subscriptions = {} 16: 17: @connection.connect 18: end
Returns the configured main collective if no specific collective is specified as options
# File lib/mcollective/client.rb, line 22 22: def collective 23: if @options[:collective].nil? 24: @config.main_collective 25: else 26: @options[:collective] 27: end 28: end
Disconnects cleanly from the middleware
# File lib/mcollective/client.rb, line 31 31: def disconnect 32: Log.debug("Disconnecting from the middleware") 33: @connection.disconnect 34: end
Performs a discovery of nodes matching the filter passed returns an array of nodes
An integer limit can be supplied this will have the effect of the discovery being cancelled soon as it reached the requested limit of hosts
# File lib/mcollective/client.rb, line 114 114: def discover(filter, timeout, limit=0) 115: raise "Limit has to be an integer" unless limit.is_a?(Fixnum) 116: 117: begin 118: hosts = [] 119: Timeout.timeout(timeout) do 120: reqid = sendreq("ping", "discovery", filter) 121: Log.debug("Waiting #{timeout} seconds for discovery replies to request #{reqid}") 122: 123: loop do 124: reply = receive(reqid) 125: Log.debug("Got discovery reply from #{reply.payload[:senderid]}") 126: hosts << reply.payload[:senderid] 127: 128: return hosts if limit > 0 && hosts.size == limit 129: end 130: end 131: rescue Timeout::Error => e 132: rescue Exception => e 133: raise 134: ensure 135: unsubscribe("discovery", :reply) 136: end 137: 138: hosts.sort 139: end
Performs a discovery and then send a request, performs the passed block for each response
times = discovered_req("status", "mcollectived", options, client) {|resp| pp resp }
It returns a hash of times and timeouts for discovery and total run is taken from the options hash which in turn is generally built using MCollective::Optionparser
# File lib/mcollective/client.rb, line 201 201: def discovered_req(body, agent, options=false) 202: stat = {:starttime => Time.now.to_f, :discoverytime => 0, :blocktime => 0, :totaltime => 0} 203: 204: options = @options unless options 205: 206: STDOUT.sync = true 207: 208: print("Determining the amount of hosts matching filter for #{options[:disctimeout]} seconds .... ") 209: 210: begin 211: discovered_hosts = discover(options[:filter], options[:disctimeout]) 212: discovered = discovered_hosts.size 213: hosts_responded = [] 214: hosts_not_responded = discovered_hosts 215: 216: stat[:discoverytime] = Time.now.to_f - stat[:starttime] 217: 218: puts("#{discovered}\n\n") 219: rescue Interrupt 220: puts("Discovery interrupted.") 221: exit! 222: end 223: 224: raise("No matching clients found") if discovered == 0 225: 226: begin 227: Timeout.timeout(options[:timeout]) do 228: reqid = sendreq(body, agent, options[:filter]) 229: 230: (1..discovered).each do |c| 231: resp = receive(reqid) 232: 233: hosts_responded << resp.payload[:senderid] 234: hosts_not_responded.delete(resp.payload[:senderid]) if hosts_not_responded.include?(resp.payload[:senderid]) 235: 236: yield(resp.payload) 237: end 238: end 239: rescue Interrupt => e 240: rescue Timeout::Error => e 241: end 242: 243: stat[:totaltime] = Time.now.to_f - stat[:starttime] 244: stat[:blocktime] = stat[:totaltime] - stat[:discoverytime] 245: stat[:responses] = hosts_responded.size 246: stat[:responsesfrom] = hosts_responded 247: stat[:noresponsefrom] = hosts_not_responded 248: stat[:discovered] = discovered 249: 250: @stats = stat 251: return stat 252: end
Prints out the stats returns from req and discovered_req in a nice way
# File lib/mcollective/client.rb, line 255 255: def display_stats(stats, options=false, caption="stomp call summary") 256: options = @options unless options 257: 258: if options[:verbose] 259: puts("\n---- #{caption} ----") 260: 261: if stats[:discovered] 262: puts(" Nodes: #{stats[:discovered]} / #{stats[:responses]}") 263: else 264: puts(" Nodes: #{stats[:responses]}") 265: end 266: 267: printf(" Start Time: %s\n", Time.at(stats[:starttime])) 268: printf(" Discovery Time: %.2fms\n", stats[:discoverytime] * 1000) 269: printf(" Agent Time: %.2fms\n", stats[:blocktime] * 1000) 270: printf(" Total Time: %.2fms\n", stats[:totaltime] * 1000) 271: 272: else 273: if stats[:discovered] 274: printf("\nFinished processing %d / %d hosts in %.2f ms\n\n", stats[:responses], stats[:discovered], stats[:blocktime] * 1000) 275: else 276: printf("\nFinished processing %d hosts in %.2f ms\n\n", stats[:responses], stats[:blocktime] * 1000) 277: end 278: end 279: 280: if stats[:noresponsefrom].size > 0 281: puts("\nNo response from:\n") 282: 283: stats[:noresponsefrom].each do |c| 284: puts if c % 4 == 1 285: printf("%30s", c) 286: end 287: 288: puts 289: end 290: end
Blocking call that waits for ever for a message to arrive.
If you give it a requestid this means you‘ve previously send a request with that ID and now you just want replies that matches that id, in that case the current connection will just ignore all messages not directed at it and keep waiting for more till it finds a matching message.
# File lib/mcollective/client.rb, line 84 84: def receive(requestid = nil) 85: reply = nil 86: 87: begin 88: reply = @connection.receive 89: reply.type = :reply 90: reply.expected_msgid = requestid 91: 92: reply.decode! 93: 94: reply.payload[:senderid] = Digest::MD5.hexdigest(reply.payload[:senderid]) if ENV.include?("MCOLLECTIVE_ANON") 95: 96: raise(MsgDoesNotMatchRequestID, "Message reqid #{requestid} does not match our reqid #{reply.requestid}") unless reply.requestid == requestid 97: rescue SecurityValidationFailed => e 98: Log.warn("Ignoring a message that did not pass security validations") 99: retry 100: rescue MsgDoesNotMatchRequestID => e 101: Log.debug("Ignoring a message for some other client") 102: retry 103: end 104: 105: reply 106: end
Send a request, performs the passed block for each response
times = req("status", "mcollectived", options, client) {|resp|
pp resp
}
It returns a hash of times and timeouts for discovery and total run is taken from the options hash which in turn is generally built using MCollective::Optionparser
# File lib/mcollective/client.rb, line 149 149: def req(body, agent=nil, options=false, waitfor=0) 150: if body.is_a?(Message) 151: agent = body.agent 152: options = body.options 153: waitfor = body.discovered_hosts.size || 0 154: end 155: 156: stat = {:starttime => Time.now.to_f, :discoverytime => 0, :blocktime => 0, :totaltime => 0} 157: 158: options = @options unless options 159: 160: STDOUT.sync = true 161: 162: hosts_responded = 0 163: 164: begin 165: Timeout.timeout(options[:timeout]) do 166: reqid = sendreq(body, agent, options[:filter]) 167: 168: loop do 169: resp = receive(reqid) 170: 171: hosts_responded += 1 172: 173: yield(resp.payload) 174: 175: break if (waitfor != 0 && hosts_responded >= waitfor) 176: end 177: end 178: rescue Interrupt => e 179: rescue Timeout::Error => e 180: ensure 181: unsubscribe(agent, :reply) 182: end 183: 184: stat[:totaltime] = Time.now.to_f - stat[:starttime] 185: stat[:blocktime] = stat[:totaltime] - stat[:discoverytime] 186: stat[:responses] = hosts_responded 187: stat[:noresponsefrom] = [] 188: 189: @stats = stat 190: return stat 191: end
Sends a request and returns the generated request id, doesn‘t wait for responses and doesn‘t execute any passed in code blocks for responses
# File lib/mcollective/client.rb, line 38 38: def sendreq(msg, agent, filter = {}) 39: if msg.is_a?(Message) 40: request = msg 41: agent = request.agent 42: else 43: ttl = @options[:ttl] || @config.ttl 44: request = Message.new(msg, nil, {:agent => agent, :type => :request, :collective => collective, :filter => filter, :ttl => ttl}) 45: request.reply_to = @options[:reply_to] if @options[:reply_to] 46: end 47: 48: request.encode! 49: 50: Log.debug("Sending request #{request.requestid} to the #{request.agent} agent with ttl #{request.ttl} in collective #{request.collective}") 51: 52: subscribe(agent, :reply) 53: 54: request.publish 55: 56: request.requestid 57: end
# File lib/mcollective/client.rb, line 59 59: def subscribe(agent, type) 60: unless @subscriptions.include?(agent) 61: subscription = Util.make_subscriptions(agent, type, collective) 62: Log.debug("Subscribing to #{type} target for agent #{agent}") 63: 64: Util.subscribe(subscription) 65: @subscriptions[agent] = 1 66: end 67: end
# File lib/mcollective/client.rb, line 69 69: def unsubscribe(agent, type) 70: if @subscriptions.include?(agent) 71: subscription = Util.make_subscriptions(agent, type, collective) 72: Log.debug("Unsubscribing #{type} target for #{agent}") 73: 74: Util.unsubscribe(subscription) 75: @subscriptions.delete(agent) 76: end 77: end