Class: StepperMotor::Journey
- Inherits:
-
ActiveRecord::Base
- Object
- ActiveRecord::Base
- StepperMotor::Journey
- Defined in:
- lib/stepper_motor/journey.rb
Overview
A Journey is the main building block of StepperMotor. You create a journey to guide a particular model
(“hero”) through a sequence of steps. Any of your model can be the hero and have multiple Journeys. To create
your own Journey, subclass the StepperMotor::Journey
class and define your steps. For example, a drip mail
campaign can look like this:
class ResubscribeCampaign < StepperMotor::Journey
step do
ReinviteMailer.with(recipient: hero).deliver_later
end
step, wait: 3.days do
cancel! if hero.active?
ReinviteMailer.with(recipient: hero).deliver_later
end
step, wait: 3.days do
cancel! if hero.active?
ReinviteMailer.with(recipient: hero).deliver_later
end
step, wait: 3.days do
cancel! if hero.active?
hero.close_account!
end
end
Creating a record for the Journey (just using create!
) will instantly send your hero on their way:
ResubscribeCampaign.create!(hero: current_account)
To stop the journey forcibly, delete it from your database - or call cancel!
within any of the steps.
Constant Summary collapse
- STATES =
%w[ready performing canceled finished]
Class Method Summary collapse
-
.lookup_step_definition(by_step_name) ⇒ StepperMotor::Step?
Returns the
Step
object for a named step. -
.step(name = nil, wait: nil, after: nil, &blk) ⇒ Object
Defines a step in the journey.
Instance Method Summary collapse
-
#after_locking_for_step(step_name) ⇒ Object
-
#after_step_completes(step_name) ⇒ Object
-
#before_step_starts(step_name) ⇒ Object
-
#cancel! ⇒ Object
Is a convenient way to end a hero’s journey.
-
#logger ⇒ Object
-
#lookup_step_definition(by_step_name) ⇒ Object
Alias for the class method, for brevity.
-
#perform_next_step! ⇒ void
Performs the next step in the journey.
-
#reattempt!(wait: nil) ⇒ Object
Inside a step it is possible to ask StepperMotor to retry to start the step at a later point in time.
-
#schedule! ⇒ Object
-
#set_next_step_and_enqueue(next_step_definition) ⇒ Object
-
#step_definitions ⇒ Object
Alias for the class attribute, for brevity.
-
#time_remaining_until_final_step ⇒ ActiveSupport::Duration
-
#to_global_id ⇒ Object
Class Method Details
.lookup_step_definition(by_step_name) ⇒ StepperMotor::Step?
Returns the Step
object for a named step. This is used when performing a step, but can also
be useful in other contexts.
93 94 95 |
# File 'lib/stepper_motor/journey.rb', line 93 def self.lookup_step_definition(by_step_name) step_definitions.find { |d| d.name.to_s == by_step_name.to_s } end |
.step(name = nil, wait: nil, after: nil, &blk) ⇒ Object
Defines a step in the journey. Steps are stacked top to bottom and get performed in sequence.
61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 |
# File 'lib/stepper_motor/journey.rb', line 61 def self.step(name = nil, wait: nil, after: nil, &blk) wait = if wait && after raise StepConfigurationError, "Either wait: or after: can be specified, but not both" elsif !wait && !after 0 elsif after accumulated = step_definitions.map(&:wait).sum after - accumulated else wait end raise StepConfigurationError, "wait: cannot be negative, but computed was #{wait}s" if wait.negative? name ||= "step_%d" % (step_definitions.length + 1) name = name.to_s known_step_names = step_definitions.map(&:name) raise StepConfigurationError, "Step named #{name.inspect} already defined" if known_step_names.include?(name) # Create the step definition step_definition = StepperMotor::Step.new(name: name, wait: wait, seq: step_definitions.length, &blk) # As per Rails docs: you need to be aware when using class_attribute with mutable structures # as Array or Hash. In such cases, you don’t want to do changes in place. Instead use setters. # See https://apidock.com/rails/v7.1.3.2/Class/class_attribute self.step_definitions = step_definitions + [step_definition] end |
Instance Method Details
#after_locking_for_step(step_name) ⇒ Object
255 256 |
# File 'lib/stepper_motor/journey.rb', line 255 def after_locking_for_step(step_name) end |
#after_step_completes(step_name) ⇒ Object
261 262 |
# File 'lib/stepper_motor/journey.rb', line 261 def after_step_completes(step_name) end |
#before_step_starts(step_name) ⇒ Object
258 259 |
# File 'lib/stepper_motor/journey.rb', line 258 def before_step_starts(step_name) end |
#cancel! ⇒ Object
Is a convenient way to end a hero’s journey. Imagine you enter a step where you are inviting a user
to rejoin the platform, and are just about to send them an email - but they have already joined. You
can therefore cancel their journey. Canceling bails you out of the step
-defined block and sets the journey record to the canceled
state.
Calling cancel!
will abort the execution of the current step.
116 117 118 119 |
# File 'lib/stepper_motor/journey.rb', line 116 def cancel! canceled! throw :abort_step end |
#logger ⇒ Object
244 245 246 247 248 249 250 251 252 253 |
# File 'lib/stepper_motor/journey.rb', line 244 def logger if (logger_from_parent = super) tag = [self.class.to_s, to_param].join(":") tag << " at " << @current_step_definition.name if @current_step_definition logger_from_parent.tagged(tag) else # Furnish a "null logger" ActiveSupport::Logger.new(nil) end end |
#lookup_step_definition(by_step_name) ⇒ Object
Alias for the class method, for brevity
107 108 109 |
# File 'lib/stepper_motor/journey.rb', line 107 def lookup_step_definition(by_step_name) self.class.lookup_step_definition(by_step_name) end |
#perform_next_step! ⇒ void
This method returns an undefined value.
Performs the next step in the journey. Will check whether any other process has performed the step already and whether the record is unchanged, and will then lock it and set the state to ‘performimg’.
After setting the state, it will determine the next step to perform, and perform it. Depending on the outcome of
the step another PerformStepJob
may get enqueued. If the journey ends here, the journey record will set its state
to ‘finished’.
140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 |
# File 'lib/stepper_motor/journey.rb', line 140 def perform_next_step! # Make sure we can't start running the same step of the same journey twice next_step_name_before_locking = next_step_name with_lock do # Make sure no other worker has snatched this journey and made steps instead of us return unless ready? && next_step_name == next_step_name_before_locking performing! after_locking_for_step(next_step_name) end current_step_name = next_step_name if current_step_name logger.debug { "preparing to perform step #{current_step_name}" } else logger.debug { "no next step - finishing journey" } # If there is no step set - just terminate the journey return finished! unless current_step_name end before_step_starts(current_step_name) # Recover the step definition @current_step_definition = lookup_step_definition(current_step_name) unless @current_step_definition logger.debug { "no definition for #{current_step_name} - finishing journey" } return finished! end # Is we tried to run the step but it is not yet time to do so, # enqueue a new job to perform it and stop if next_step_to_be_performed_at > Time.current logger.warn { "tried to perform #{current_step_name} prematurely" } schedule! return ready! end # Perform the actual step increment!(:steps_entered) logger.debug { "entering step #{current_step_name}" } catch(:abort_step) do instance_exec(&@current_step_definition) end # By the end of the step the Journey must either be untouched or saved if changed? raise StepperMotor::JourneyNotPersisted, <<~MSG #{self} had its attributes changed but was not saved inside step #{current_step_name.inspect} this means that the subsequent execution (which may be done asynchronously) is likely to see a stale Journey, and will execute incorrectly. If you mutate the Journey inside of a step, make sure to call `save!` or use methods that save in-place (such as `increment!`). MSG end increment!(:steps_completed) logger.debug { "completed #{current_step_name} without exceptions" } if canceled? # The step aborted the journey, nothing to do logger.info { "has been canceled inside #{current_step_name}" } elsif @reattempt_after # The step asked the actions to be attempted at a later time logger.info { "will reattempt #{current_step_name} in #{@reattempt_after} seconds" } update!(previous_step_name: current_step_name, next_step_name: current_step_name, next_step_to_be_performed_at: Time.current + @reattempt_after) schedule! ready! elsif finished? logger.info { "was marked finished inside the step" } update!(previous_step_name: current_step_name, next_step_name: nil) elsif (next_step_definition = step_definitions[@current_step_definition.seq + 1]) logger.info { "will continue to #{next_step_definition.name}" } set_next_step_and_enqueue(next_step_definition) ready! else # The hero's journey is complete logger.info { "journey completed" } finished! update!(previous_step_name: current_step_name, next_step_name: nil) end ensure # The instance variables must not be present if `perform_next_step!` gets called # on this same object again. This will be the case if the steps are performed inline # and not via background jobs (which reload the model) @reattempt_after = nil @current_step_definition = nil after_step_completes(current_step_name) if current_step_name end |
#reattempt!(wait: nil) ⇒ Object
Inside a step it is possible to ask StepperMotor to retry to start the step at a later point in time. Maybe now is an inconvenient moment
(are you about to send a push notification at 3AM perhaps?). The wait:
parameter specifies how long to defer reattempting the step for.
Reattempting will resume the step from the beginning, so the step should be idempotent.
Calling reattempt!
will abort the execution of the current step.
126 127 128 129 130 |
# File 'lib/stepper_motor/journey.rb', line 126 def reattempt!(wait: nil) # The default `wait` is the one for the step definition @reattempt_after = wait || @current_step_definition.wait || 0 throw :abort_step end |
#schedule! ⇒ Object
264 265 266 |
# File 'lib/stepper_motor/journey.rb', line 264 def schedule! StepperMotor.scheduler.schedule(self) end |
#set_next_step_and_enqueue(next_step_definition) ⇒ Object
238 239 240 241 242 |
# File 'lib/stepper_motor/journey.rb', line 238 def set_next_step_and_enqueue(next_step_definition) wait = next_step_definition.wait update!(previous_step_name: next_step_name, next_step_name: next_step_definition.name, next_step_to_be_performed_at: Time.current + wait) schedule! end |
#step_definitions ⇒ Object
Alias for the class attribute, for brevity
42 |
# File 'lib/stepper_motor/journey.rb', line 42 class_attribute :step_definitions, default: [] |
#time_remaining_until_final_step ⇒ ActiveSupport::Duration
231 232 233 234 235 236 |
# File 'lib/stepper_motor/journey.rb', line 231 def time_remaining_until_final_step current_step_seq = @current_step_definition&.seq || -1 subsequent_steps = step_definitions.select { |definition| definition.seq > current_step_seq } seconds_remaining = subsequent_steps.map { |definition| definition.wait.to_f }.sum seconds_remaining.seconds # Convert to ActiveSupport::Duration end |
#to_global_id ⇒ Object
268 269 270 271 272 |
# File 'lib/stepper_motor/journey.rb', line 268 def to_global_id # This gets included into ActiveModel during Rails bootstrap, # for now do this manually GlobalID.create(self, app: "stepper-motor") end |