Class: StepperMotor::Journey
- Inherits:
-
ActiveRecord::Base
- Object
- ActiveRecord::Base
- StepperMotor::Journey
- Includes:
- FlowControl, Recovery
- Defined in:
- lib/stepper_motor/journey.rb
Overview
A Journey is the main building block of StepperMotor. You create a journey to guide a particular model
("hero") through a sequence of steps. Any of your model can be the hero and have multiple Journeys. To create
your own Journey, subclass the StepperMotor::Journey
class and define your steps. For example, a drip mail
campaign can look like this:
class ResubscribeCampaign < StepperMotor::Journey
step do
ReinviteMailer.with(recipient: hero).deliver_later
end
step wait: 3.days do
cancel! if hero.active?
ReinviteMailer.with(recipient: hero).deliver_later
end
step wait: 3.days do
cancel! if hero.active?
ReinviteMailer.with(recipient: hero).deliver_later
end
step wait: 3.days do
cancel! if hero.active?
hero.close_account!
end
end
Creating a record for the Journey (just using create!
) will instantly send your hero on their way:
ResubscribeCampaign.create!(hero: current_account)
To stop the journey forcibly, delete it from your database - or call cancel!
within any of the steps.
Defined Under Namespace
Modules: FlowControl, Recovery
Constant Summary collapse
- STATES =
%w[ready paused performing canceled finished]
Class Method Summary collapse
-
.cancel_if(condition_arg = :__no_argument_given__, &condition_blk) ⇒ void
Defines a condition that will cause the journey to cancel if satisfied.
-
.lookup_step_definition(by_step_name) ⇒ StepperMotor::Step?
Returns the
Step
object for a named step. -
.step(name = nil, wait: nil, after: nil, before_step: nil, after_step: nil, **additional_step_definition_options, &blk) ⇒ StepperMotor::Step
Defines a step in the journey.
-
.step_definitions_following(step_definition) ⇒ Array<StepperMotor::Step>
Returns all step definitions that follow the given step in the journey.
Instance Method Summary collapse
- #after_locking_for_step(step_name) ⇒ Object
- #after_performing_step_with_exception(step_name, exception) ⇒ Object
- #after_performing_step_without_exception(step_name) ⇒ Object
- #before_step_starts(step_name) ⇒ Object
-
#cancel_if_conditions ⇒ Array<StepperMotor::Conditional>
The cancel_if conditions defined for this journey class.
- #logger ⇒ Object
-
#lookup_step_definition(by_step_name) ⇒ Object
Alias for the class method, for brevity.
-
#perform_next_step!(idempotency_key: nil) ⇒ void
Performs the next step in the journey.
- #schedule! ⇒ Object
- #set_next_step_and_enqueue(next_step_definition, wait: nil) ⇒ Object
-
#step_definitions ⇒ Object
Alias for the class attribute, for brevity.
- #time_remaining_until_final_step ⇒ ActiveSupport::Duration
Methods included from Recovery
Methods included from FlowControl
#cancel!, #pause!, #reattempt!, #resume!, #skip!
Class Method Details
.cancel_if(condition_arg = :__no_argument_given__, &condition_blk) ⇒ void
This method returns an undefined value.
Defines a condition that will cause the journey to cancel if satisfied.
This works like Rails' etag
- it's class-inheritable and appendable.
Multiple cancel_if
calls can be made to a Journey definition.
All conditions are evaluated after setting the state to performing
.
If any condition is satisfied, the journey will cancel.
221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 |
# File 'lib/stepper_motor/journey.rb', line 221 def self.cancel_if(condition_arg = :__no_argument_given__, &condition_blk) # Check if neither argument nor block is provided if condition_arg == :__no_argument_given__ && !condition_blk raise ArgumentError, "cancel_if requires either a condition argument or a block" end # Check if both argument and block are provided if condition_arg != :__no_argument_given__ && condition_blk raise ArgumentError, "cancel_if accepts either a condition argument or a block, but not both" end # Select the condition: positional argument takes precedence if not sentinel condition = if condition_arg != :__no_argument_given__ condition_arg else condition_blk end conditional = StepperMotor::Conditional.new(condition) # As per Rails docs: you need to be aware when using class_attribute with mutable structures # as Array or Hash. In such cases, you don't want to do changes in place. Instead use setters. # See https://apidock.com/rails/v7.1.3.2/Class/class_attribute self.cancel_if_conditions = cancel_if_conditions + [conditional] end |
.lookup_step_definition(by_step_name) ⇒ StepperMotor::Step?
Returns the Step
object for a named step. This is used when performing a step, but can also
be useful in other contexts.
184 185 186 |
# File 'lib/stepper_motor/journey.rb', line 184 def self.lookup_step_definition(by_step_name) step_definitions.find { |d| d.name.to_s == by_step_name.to_s } end |
.step(name = nil, wait: nil, after: nil, before_step: nil, after_step: nil, **additional_step_definition_options, &blk) ⇒ StepperMotor::Step
Defines a step in the journey. Steps are stacked top to bottom and get performed in sequence.
98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 |
# File 'lib/stepper_motor/journey.rb', line 98 def self.step(name = nil, wait: nil, after: nil, before_step: nil, after_step: nil, **, &blk) # Handle the if: alias for backward compatibility if .key?(:if) && .key?(:skip_if) raise StepConfigurationError, "Either skip_if: or if: can be specified, but not both" end if .key?(:if) # Convert if: to skip_if: [:skip_if] = StepperMotor::Conditional.new(.delete(:if), negate: true) end # Validate before_step and after_step parameters if before_step && after_step raise StepConfigurationError, "Either before_step: or after_step: can be specified, but not both" end # Validate that referenced steps exist if before_step referenced_step = lookup_step_definition(before_step) unless referenced_step raise StepConfigurationError, "Step named #{before_step.inspect} not found for before_step: parameter" end end if after_step referenced_step = lookup_step_definition(after_step) unless referenced_step raise StepConfigurationError, "Step named #{after_step.inspect} not found for after_step: parameter" end end wait = if wait && after&.respond_to?(:to_f) raise StepConfigurationError, "Either wait: or after: can be specified, but not both" elsif !wait && (!after || !after.respond_to?(:to_f)) 0 elsif after&.respond_to?(:to_f) accumulated = step_definitions.map(&:wait).sum after - accumulated else wait end raise StepConfigurationError, "wait: cannot be negative, but computed was #{wait}s" if wait.negative? if name.blank? && blk.blank? raise StepConfigurationError, <<~MSG Step #{step_definitions.length + 1} of #{self} has no explicit name, and no block with step definition has been provided. Without a name the step must be defined with a block to execute. If you want an instance method of this Journey to be used as the step, pass the name of the method as the name of the step. MSG end name ||= "step_%d" % (step_definitions.length + 1) name = name.to_s known_step_names = step_definitions.map(&:name) raise StepConfigurationError, "Step named #{name.inspect} already defined" if known_step_names.include?(name) # Create the step definition step_definition = StepperMotor::Step.new(name: name, wait: wait, **, &blk) # Determine insertion position based on before_step or after_step parameters if before_step target_step = lookup_step_definition(before_step) target_index = step_definitions.index(target_step) new_step_definitions = step_definitions.dup new_step_definitions.insert(target_index, step_definition) self.step_definitions = new_step_definitions elsif after_step target_step = lookup_step_definition(after_step) target_index = step_definitions.index(target_step) new_step_definitions = step_definitions.dup new_step_definitions.insert(target_index + 1, step_definition) self.step_definitions = new_step_definitions else # Default behavior: append to the end self.step_definitions = step_definitions + [step_definition] end step_definition end |
.step_definitions_following(step_definition) ⇒ Array<StepperMotor::Step>
Returns all step definitions that follow the given step in the journey
192 193 194 195 196 |
# File 'lib/stepper_motor/journey.rb', line 192 def self.step_definitions_following(step_definition) current_index = step_definitions.index(step_definition) return [] unless current_index step_definitions[(current_index + 1)..] end |
Instance Method Details
#after_locking_for_step(step_name) ⇒ Object
413 414 |
# File 'lib/stepper_motor/journey.rb', line 413 def after_locking_for_step(step_name) end |
#after_performing_step_with_exception(step_name, exception) ⇒ Object
416 417 |
# File 'lib/stepper_motor/journey.rb', line 416 def after_performing_step_with_exception(step_name, exception) end |
#after_performing_step_without_exception(step_name) ⇒ Object
422 423 |
# File 'lib/stepper_motor/journey.rb', line 422 def after_performing_step_without_exception(step_name) end |
#before_step_starts(step_name) ⇒ Object
419 420 |
# File 'lib/stepper_motor/journey.rb', line 419 def before_step_starts(step_name) end |
#cancel_if_conditions ⇒ Array<StepperMotor::Conditional>
Returns the cancel_if conditions defined for this journey class.
51 |
# File 'lib/stepper_motor/journey.rb', line 51 class_attribute :cancel_if_conditions, default: [] |
#logger ⇒ Object
402 403 404 405 406 407 408 409 410 411 |
# File 'lib/stepper_motor/journey.rb', line 402 def logger if (logger_from_parent = super) tag = [self.class.to_s, to_param].join(":") tag << " at " << @current_step_definition.name if @current_step_definition logger_from_parent.tagged(tag) else # Furnish a "null logger" ActiveSupport::Logger.new(nil) end end |
#lookup_step_definition(by_step_name) ⇒ Object
Alias for the class method, for brevity
208 209 210 |
# File 'lib/stepper_motor/journey.rb', line 208 def lookup_step_definition(by_step_name) self.class.lookup_step_definition(by_step_name) end |
#perform_next_step!(idempotency_key: nil) ⇒ void
This method returns an undefined value.
Performs the next step in the journey. Will check whether any other process has performed the step already and whether the record is unchanged, and will then lock it and set the state to 'performimg'.
After setting the state, it will determine the next step to perform, and perform it. Depending on the outcome of
the step another PerformStepJob
may get enqueued. If the journey ends here, the journey record will set its state
to 'finished'.
257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 |
# File 'lib/stepper_motor/journey.rb', line 257 def perform_next_step!(idempotency_key: nil) # Make sure we can't start running the same step of the same journey twice next_step_name_before_locking = next_step_name with_lock do # Make sure no other worker has snatched this journey and made steps instead of us return unless ready? && next_step_name == next_step_name_before_locking # Check idempotency key if both are present return if idempotency_key && idempotency_key != self.idempotency_key performing! after_locking_for_step(next_step_name) end # Check cancel_if conditions after setting state to performing if cancel_if_conditions.any? { |conditional| conditional.satisfied_by?(self) } logger.info { "cancel_if condition satisfied, canceling journey" } cancel! return end current_step_name = next_step_name if current_step_name logger.debug { "preparing to perform step #{current_step_name}" } else logger.debug { "no next step - finishing journey" } # If there is no step set - just terminate the journey return finished! unless current_step_name end before_step_starts(current_step_name) # Recover the step definition @current_step_definition = lookup_step_definition(current_step_name) unless @current_step_definition logger.debug { "no definition for #{current_step_name} - finishing journey" } return finished! end # Is we tried to run the step but it is not yet time to do so, # enqueue a new job to perform it and stop if next_step_to_be_performed_at > Time.current logger.warn { "tried to perform #{current_step_name} prematurely" } schedule! return ready! end # Perform the actual step increment!(:steps_entered) logger.debug { "entering step #{current_step_name}" } # The flow control for reattempt! and cancel! happens inside perform_in_context_of ex_rescued_at_perform = nil begin @current_step_definition.perform_in_context_of(self) rescue => e ex_rescued_at_perform = e logger.debug { "#{e} raised during #{@current_step_definition.name}, will be re-raised after" } end # By the end of the step the Journey must either be untouched or saved if changed? raise StepperMotor::JourneyNotPersisted, <<~MSG #{self} had its attributes changed but was not saved inside step #{current_step_name.inspect} this means that the subsequent execution (which may be done asynchronously) is likely to see a stale Journey, and will execute incorrectly. If you mutate the Journey inside of a step, make sure to call `save!` or use methods that save in-place (such as `increment!`). MSG end if ex_rescued_at_perform logger.warn { "performed #{current_step_name}, #{ex_rescued_at_perform} was raised" } else increment!(:steps_completed) logger.debug { "performed #{current_step_name} without exceptions" } end if paused? || canceled? # The step made arrangements regarding how we shoudl continue, nothing to do logger.info { "has been #{state} inside #{current_step_name}" } elsif @reattempt_after # The step asked the actions to be attempted at a later time logger.info { "will reattempt #{current_step_name} in #{@reattempt_after} seconds" } set_next_step_and_enqueue(@current_step_definition, wait: @reattempt_after) ready! elsif @skip_current_step # The step asked to be skipped next_step_definition = self.class.step_definitions_following(@current_step_definition).first if next_step_definition # There are more steps after this one - schedule the next step logger.info { "skipping current step #{current_step_name}, will continue to #{next_step_definition.name}" } set_next_step_and_enqueue(next_step_definition) ready! else # This is the last step - finish the journey logger.info { "skipping current step #{current_step_name}, finishing journey" } finished! update!(previous_step_name: current_step_name, next_step_name: nil) end elsif finished? logger.info { "was marked finished inside the step" } update!(previous_step_name: current_step_name, next_step_name: nil) elsif (next_step_definition = self.class.step_definitions_following(@current_step_definition).first) logger.info { "will continue to #{next_step_definition.name}" } set_next_step_and_enqueue(next_step_definition) ready! else logger.info { "has finished" } # The hero's journey is complete finished! update!(previous_step_name: current_step_name, next_step_name: nil) end ensure # The instance variables must not be present if `perform_next_step!` gets called # on this same object again. This will be the case if the steps are performed inline # and not via background jobs (which reload the model). This should actually be solved # using some object that contains the state of the action later, but for now - the dirty approach is fine. @reattempt_after = nil @skip_current_step = nil @current_step_definition = nil # Re-raise the exception, now that we have persisted the Journey according to the recovery policy if ex_rescued_at_perform after_performing_step_with_exception(current_step_name, ex_rescued_at_perform) if current_step_name raise ex_rescued_at_perform elsif current_step_name after_performing_step_without_exception(current_step_name) end end |
#schedule! ⇒ Object
425 426 427 |
# File 'lib/stepper_motor/journey.rb', line 425 def schedule! StepperMotor.scheduler.schedule(self) end |
#set_next_step_and_enqueue(next_step_definition, wait: nil) ⇒ Object
395 396 397 398 399 400 |
# File 'lib/stepper_motor/journey.rb', line 395 def set_next_step_and_enqueue(next_step_definition, wait: nil) wait ||= next_step_definition.wait next_idempotency_key = SecureRandom.base36(16) update!(previous_step_name: next_step_name, next_step_name: next_step_definition.name, next_step_to_be_performed_at: Time.current + wait, idempotency_key: next_idempotency_key) schedule! end |
#step_definitions ⇒ Object
Alias for the class attribute, for brevity
48 |
# File 'lib/stepper_motor/journey.rb', line 48 class_attribute :step_definitions, default: [] |
#time_remaining_until_final_step ⇒ ActiveSupport::Duration
389 390 391 392 393 |
# File 'lib/stepper_motor/journey.rb', line 389 def time_remaining_until_final_step subsequent_steps = @current_step_definition ? self.class.step_definitions_following(@current_step_definition) : step_definitions seconds_remaining = subsequent_steps.map { |definition| definition.wait.to_f }.sum seconds_remaining.seconds # Convert to ActiveSupport::Duration end |