Class MCollective::Message
In: lib/mcollective/message.rb
Parent: Object

container for a message, its headers, agent, collective and other meta data

Methods

Constants

VALIDTYPES = [:message, :request, :direct_request, :reply]

Attributes

agent  [RW] 
collective  [RW] 
discovered_hosts  [RW] 
expected_msgid  [R] 
filter  [RW] 
headers  [RW] 
message  [R] 
msgtime  [R] 
options  [RW] 
payload  [R] 
reply_to  [R] 
request  [R] 
requestid  [RW] 
ttl  [RW] 
type  [R] 
validated  [R] 

Public Class methods

payload - the message body without headers etc, just the text message - the original message received from the middleware options[:base64] - if the body base64 encoded? options[:agent] - the agent the message is for/from options[:collective] - the collective its for/from options[:headers] - the message headers options[:type] - an indicator about the type of message, :message, :request, :direct_request or :reply options[:request] - if this is a reply this should old the message we are replying to options[:filter] - for requests, the filter to encode into the message options[:options] - the normal client options hash options[:ttl] - the maximum amount of seconds this message can be valid for options[:expected_msgid] - in the case of replies this is the msgid it is expecting in the replies

[Source]

    # File lib/mcollective/message.rb, line 22
22:     def initialize(payload, message, options = {})
23:       options = {:base64 => false,
24:                  :agent => nil,
25:                  :headers => {},
26:                  :type => :message,
27:                  :request => nil,
28:                  :filter => Util.empty_filter,
29:                  :options => {},
30:                  :ttl => 60,
31:                  :expected_msgid => nil,
32:                  :collective => nil}.merge(options)
33: 
34:       @payload = payload
35:       @message = message
36:       @requestid = nil
37:       @discovered_hosts = nil
38:       @reply_to = nil
39: 
40:       @type = options[:type]
41:       @headers = options[:headers]
42:       @base64 = options[:base64]
43:       @filter = options[:filter]
44:       @expected_msgid = options[:expected_msgid]
45:       @options = options[:options]
46: 
47:       @ttl = @options[:ttl] || Config.instance.ttl
48:       @msgtime = 0
49: 
50:       @validated = false
51: 
52:       if options[:request]
53:         @request = options[:request]
54:         @agent = request.agent
55:         @collective = request.collective
56:         @type = :reply
57:       else
58:         @agent = options[:agent]
59:         @collective = options[:collective]
60:       end
61: 
62:       base64_decode!
63:     end

Public Instance methods

[Source]

     # File lib/mcollective/message.rb, line 121
121:     def base64?
122:       @base64
123:     end

[Source]

     # File lib/mcollective/message.rb, line 107
107:     def base64_decode!
108:       return unless @base64
109: 
110:       @payload = SSL.base64_decode(@payload)
111:       @base64 = false
112:     end

[Source]

     # File lib/mcollective/message.rb, line 114
114:     def base64_encode!
115:       return if @base64
116: 
117:       @payload = SSL.base64_encode(@payload)
118:       @base64 = true
119:     end

[Source]

     # File lib/mcollective/message.rb, line 197
197:     def create_reqid
198:       Digest::MD5.hexdigest("#{Config.instance.identity}-#{Time.now.to_f}-#{agent}-#{collective}")
199:     end

[Source]

     # File lib/mcollective/message.rb, line 141
141:     def decode!
142:       raise "Cannot decode message type #{type}" unless [:request, :reply].include?(type)
143: 
144:       @payload = PluginManager["security_plugin"].decodemsg(self)
145: 
146:       if type == :request
147:         raise 'callerid in request is not valid, surpressing reply to potentially forged request' unless PluginManager["security_plugin"].valid_callerid?(payload[:callerid])
148:       end
149: 
150:       [:collective, :agent, :filter, :requestid, :ttl, :msgtime].each do |prop|
151:         instance_variable_set("@#{prop}", payload[prop]) if payload.include?(prop)
152:       end
153:     end

[Source]

     # File lib/mcollective/message.rb, line 125
125:     def encode!
126:       case type
127:         when :reply
128:           raise "Cannot encode a reply message if no request has been associated with it" unless request
129:           raise 'callerid in original request is not valid, surpressing reply to potentially forged request' unless PluginManager["security_plugin"].valid_callerid?(request.payload[:callerid])
130: 
131:           @requestid = request.payload[:requestid]
132:           @payload = PluginManager["security_plugin"].encodereply(agent, payload, requestid, request.payload[:callerid])
133:         when :request, :direct_request
134:           @requestid = create_reqid
135:           @payload = PluginManager["security_plugin"].encoderequest(Config.instance.identity, payload, requestid, filter, agent, collective, ttl)
136:         else
137:           raise "Cannot encode #{type} messages"
138:       end
139:     end

in the case of reply messages we are expecting replies to a previously created message. This stores a hint to that previously sent message id and can be used by other classes like the security plugins as a means of optimizing their behavior like by ignoring messages not directed at us.

[Source]

     # File lib/mcollective/message.rb, line 102
102:     def expected_msgid=(msgid)
103:       raise "Can only store the expected msgid for reply messages" unless @type == :reply
104:       @expected_msgid = msgid
105:     end

publish a reply message by creating a target name and sending it

[Source]

     # File lib/mcollective/message.rb, line 179
179:     def publish
180:       Timeout.timeout(2) do
181:         # If we've been specificaly told about hosts that were discovered
182:         # use that information to do P2P calls if appropriate else just
183:         # send it as is.
184:         if @discovered_hosts && Config.instance.direct_addressing
185:           if @discovered_hosts.size <= Config.instance.direct_addressing_threshold
186:             @type = :direct_request
187:             Log.debug("Handling #{requestid} as a direct request")
188:           end
189: 
190:           PluginManager["connector_plugin"].publish(self)
191:         else
192:           PluginManager["connector_plugin"].publish(self)
193:         end
194:       end
195:     end

Sets a custom reply-to target for requests. The connector plugin should inspect this when constructing requests and set this header ensuring replies will go to the custom target otherwise the connector should just do what it usually does

[Source]

    # File lib/mcollective/message.rb, line 91
91:     def reply_to=(target)
92:       raise "Custom reply targets can only be set on requests" unless [:request, :direct_request].include?(@type)
93: 
94:       @reply_to = target
95:     end

Sets the message type to one of the known types. In the case of :direct_request the list of hosts to communicate with should have been set with discovered_hosts else an exception will be raised. This is for extra security, we never accidentally want to send a direct request without a list of hosts or something weird like that as it might result in a filterless broadcast being sent.

Additionally you simply cannot set :direct_request if direct_addressing was not enabled this is to force a workflow that doesnt not yield in a mistake when someone might assume direct_addressing is enabled when its not.

[Source]

    # File lib/mcollective/message.rb, line 74
74:     def type=(type)
75:       if type == :direct_request
76:         raise "Direct requests is not enabled using the direct_addressing config option" unless Config.instance.direct_addressing
77: 
78:         unless @discovered_hosts && !@discovered_hosts.empty?
79:           raise "Can only set type to :direct_request if discovered_hosts have been set"
80:         end
81:       end
82: 
83:       raise "Unknown message type #{type}" unless VALIDTYPES.include?(type)
84: 
85:       @type = type
86:     end

Perform validation against the message by checking filters and ttl

[Source]

     # File lib/mcollective/message.rb, line 156
156:     def validate
157:       raise "Can only validate request messages" unless type == :request
158: 
159:       msg_age = Time.now.utc.to_i - msgtime
160: 
161:       if msg_age > ttl
162:         cid = ""
163:         cid += payload[:callerid] + "@" if payload.include?(:callerid)
164:         cid += payload[:senderid]
165: 
166:         if msg_age > ttl
167:           PluginManager["global_stats"].ttlexpired
168: 
169:           raise(MsgTTLExpired, "Message #{requestid} from #{cid} created at #{msgtime} is #{msg_age} seconds old, TTL is #{ttl}")
170:         end
171:       end
172: 
173:       raise(NotTargettedAtUs, "Received message is not targetted to us") unless PluginManager["security_plugin"].validate_filter?(payload[:filter])
174: 
175:       @validated = true
176:     end

[Validate]