Class: StepperMotor::Journey

Inherits:
ActiveRecord::Base
  • Object
show all
Includes:
FlowControl, Recovery
Defined in:
lib/stepper_motor/journey.rb

Overview

A Journey is the main building block of StepperMotor. You create a journey to guide a particular model ("hero") through a sequence of steps. Any of your model can be the hero and have multiple Journeys. To create your own Journey, subclass the StepperMotor::Journey class and define your steps. For example, a drip mail campaign can look like this:

class ResubscribeCampaign < StepperMotor::Journey
  step do
    ReinviteMailer.with(recipient: hero).deliver_later
  end

  step wait: 3.days do
    cancel! if hero.active?
    ReinviteMailer.with(recipient: hero).deliver_later
  end

  step wait: 3.days do
    cancel! if hero.active?
    ReinviteMailer.with(recipient: hero).deliver_later
  end

  step wait: 3.days do
    cancel! if hero.active?
    hero.close_account!
  end
end

Creating a record for the Journey (just using create!) will instantly send your hero on their way:

ResubscribeCampaign.create!(hero: )

To stop the journey forcibly, delete it from your database - or call cancel! within any of the steps.

Defined Under Namespace

Modules: FlowControl, Recovery

Constant Summary collapse

STATES =
%w[ready paused performing canceled finished]

Class Method Summary collapse

Instance Method Summary collapse

Methods included from Recovery

#recover!

Methods included from FlowControl

#cancel!, #pause!, #reattempt!, #resume!, #skip!

Class Method Details

.cancel_if(condition_arg = :__no_argument_given__, &condition_blk) ⇒ void

This method returns an undefined value.

Defines a condition that will cause the journey to cancel if satisfied. This works like Rails' etag - it's class-inheritable and appendable. Multiple cancel_if calls can be made to a Journey definition. All conditions are evaluated after setting the state to performing. If any condition is satisfied, the journey will cancel.

Parameters:

  • condition_arg (TrueClass, FalseClass, Symbol, Proc, Array, Conditional) (defaults to: :__no_argument_given__)

    the condition to check

  • condition_blk (Proc)

    a block that will be evaluated as a condition



221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
# File 'lib/stepper_motor/journey.rb', line 221

def self.cancel_if(condition_arg = :__no_argument_given__, &condition_blk)
  # Check if neither argument nor block is provided
  if condition_arg == :__no_argument_given__ && !condition_blk
    raise ArgumentError, "cancel_if requires either a condition argument or a block"
  end

  # Check if both argument and block are provided
  if condition_arg != :__no_argument_given__ && condition_blk
    raise ArgumentError, "cancel_if accepts either a condition argument or a block, but not both"
  end

  # Select the condition: positional argument takes precedence if not sentinel
  condition = if condition_arg != :__no_argument_given__
    condition_arg
  else
    condition_blk
  end

  conditional = StepperMotor::Conditional.new(condition)

  # As per Rails docs: you need to be aware when using class_attribute with mutable structures
  # as Array or Hash. In such cases, you don't want to do changes in place. Instead use setters.
  # See https://apidock.com/rails/v7.1.3.2/Class/class_attribute
  self.cancel_if_conditions = cancel_if_conditions + [conditional]
end

.lookup_step_definition(by_step_name) ⇒ StepperMotor::Step?

Returns the Step object for a named step. This is used when performing a step, but can also be useful in other contexts.

Parameters:

  • by_step_name (Symbol, String)

    the name of the step to find

Returns:



184
185
186
# File 'lib/stepper_motor/journey.rb', line 184

def self.lookup_step_definition(by_step_name)
  step_definitions.find { |d| d.name.to_s == by_step_name.to_s }
end

.step(name = nil, wait: nil, after: nil, before_step: nil, after_step: nil, **additional_step_definition_options, &blk) ⇒ StepperMotor::Step

Defines a step in the journey. Steps are stacked top to bottom and get performed in sequence.

Parameters:

  • name (String, nil) (defaults to: nil)

    the name of the step. If none is provided, a name will be automatically generated based on the position of the step in the list of step_definitions. The name can also be used to call a method on the Journey instead of calling the provided block.

  • wait (Float, #to_f, ActiveSupport::Duration) (defaults to: nil)

    the amount of time this step should wait before getting performed. When the journey gets scheduled, the triggering job is going to be delayed by this amount of time, and the next_step_to_be_performed_at attribute will be set to the current time plus the wait duration. Mutually exclusive with after:

  • after (Float, #to_f, ActiveSupport::Duration) (defaults to: nil)

    the amount of time this step should wait before getting performed including all the previous waits. This allows you to set the wait time based on the time after the journey started, as opposed to when the previous step has completed. When the journey gets scheduled, the triggering job is going to be delayed by this amount of time _minus the wait values of the preceding steps, and the next_step_to_be_performed_at attribute will be set to the current time. The after value gets converted into the wait value and passed to the step definition. Mutually exclusive with wait:.

  • before_step (String, Symbol, nil) (defaults to: nil)

    the name of the step before which this step should be inserted. This allows you to control the order of steps by inserting a step before a specific existing step. The step name can be provided as a string or symbol. Mutually exclusive with after_step:.

  • after_step (String, Symbol, nil) (defaults to: nil)

    the name of the step after which this step should be inserted. This allows you to control the order of steps by inserting a step after a specific existing step. The step name can be provided as a string or symbol. Mutually exclusive with before_step:.

  • on_exception (Symbol)

    See Step#on_exception

  • skip_if (TrueClass, FalseClass, Symbol, Proc)

    condition to check before performing the step. If a symbol is provided, it will call the method on the Journey. If a block is provided, it will be executed with the Journey as context. The step will be skipped if the condition returns a truthy value.

  • if (TrueClass, FalseClass, Symbol, Proc)

    condition to check before performing the step. If a symbol is provided, it will call the method on the Journey. If a block is provided, it will be executed with the Journey as context. The step will be performed if the condition returns a truthy value. and skipped otherwise. Inverse of skip_if.

  • additional_step_definition_options (Hash)

    Any remaining options get passed to StepperMotor::Step.new as keyword arguments.

Returns:

Raises:



98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
# File 'lib/stepper_motor/journey.rb', line 98

def self.step(name = nil, wait: nil, after: nil, before_step: nil, after_step: nil, **additional_step_definition_options, &blk)
  # Handle the if: alias for backward compatibility
  if additional_step_definition_options.key?(:if) && additional_step_definition_options.key?(:skip_if)
    raise StepConfigurationError, "Either skip_if: or if: can be specified, but not both"
  end
  if additional_step_definition_options.key?(:if)
    # Convert if: to skip_if:
    additional_step_definition_options[:skip_if] = StepperMotor::Conditional.new(additional_step_definition_options.delete(:if), negate: true)
  end

  # Validate before_step and after_step parameters
  if before_step && after_step
    raise StepConfigurationError, "Either before_step: or after_step: can be specified, but not both"
  end

  # Validate that referenced steps exist
  if before_step
    referenced_step = lookup_step_definition(before_step)
    unless referenced_step
      raise StepConfigurationError, "Step named #{before_step.inspect} not found for before_step: parameter"
    end
  end

  if after_step
    referenced_step = lookup_step_definition(after_step)
    unless referenced_step
      raise StepConfigurationError, "Step named #{after_step.inspect} not found for after_step: parameter"
    end
  end

  wait = if wait && after&.respond_to?(:to_f)
    raise StepConfigurationError, "Either wait: or after: can be specified, but not both"
  elsif !wait && (!after || !after.respond_to?(:to_f))
    0
  elsif after&.respond_to?(:to_f)
    accumulated = step_definitions.map(&:wait).sum
    after - accumulated
  else
    wait
  end
  raise StepConfigurationError, "wait: cannot be negative, but computed was #{wait}s" if wait.negative?

  if name.blank? && blk.blank?
    raise StepConfigurationError, <<~MSG
      Step #{step_definitions.length + 1} of #{self} has no explicit name,
      and no block with step definition has been provided. Without a name the step
      must be defined with a block to execute. If you want an instance method of
      this Journey to be used as the step, pass the name of the method as the name of the step.
    MSG
  end

  name ||= "step_%d" % (step_definitions.length + 1)
  name = name.to_s

  known_step_names = step_definitions.map(&:name)
  raise StepConfigurationError, "Step named #{name.inspect} already defined" if known_step_names.include?(name)

  # Create the step definition
  step_definition = StepperMotor::Step.new(name: name, wait: wait, **additional_step_definition_options, &blk)

  # Determine insertion position based on before_step or after_step parameters
  if before_step
    target_step = lookup_step_definition(before_step)
    target_index = step_definitions.index(target_step)
    new_step_definitions = step_definitions.dup
    new_step_definitions.insert(target_index, step_definition)
    self.step_definitions = new_step_definitions
  elsif after_step
    target_step = lookup_step_definition(after_step)
    target_index = step_definitions.index(target_step)
    new_step_definitions = step_definitions.dup
    new_step_definitions.insert(target_index + 1, step_definition)
    self.step_definitions = new_step_definitions
  else
    # Default behavior: append to the end
    self.step_definitions = step_definitions + [step_definition]
  end

  step_definition
end

.step_definitions_following(step_definition) ⇒ Array<StepperMotor::Step>

Returns all step definitions that follow the given step in the journey

Parameters:

Returns:

  • (Array<StepperMotor::Step>)

    the following steps, or empty array if this is the last step



192
193
194
195
196
# File 'lib/stepper_motor/journey.rb', line 192

def self.step_definitions_following(step_definition)
  current_index = step_definitions.index(step_definition)
  return [] unless current_index
  step_definitions[(current_index + 1)..]
end

Instance Method Details

#after_locking_for_step(step_name) ⇒ Object



413
414
# File 'lib/stepper_motor/journey.rb', line 413

def after_locking_for_step(step_name)
end

#after_performing_step_with_exception(step_name, exception) ⇒ Object



416
417
# File 'lib/stepper_motor/journey.rb', line 416

def after_performing_step_with_exception(step_name, exception)
end

#after_performing_step_without_exception(step_name) ⇒ Object



422
423
# File 'lib/stepper_motor/journey.rb', line 422

def after_performing_step_without_exception(step_name)
end

#before_step_starts(step_name) ⇒ Object



419
420
# File 'lib/stepper_motor/journey.rb', line 419

def before_step_starts(step_name)
end

#cancel_if_conditionsArray<StepperMotor::Conditional>

Returns the cancel_if conditions defined for this journey class.

Returns:



51
# File 'lib/stepper_motor/journey.rb', line 51

class_attribute :cancel_if_conditions, default: []

#loggerObject



402
403
404
405
406
407
408
409
410
411
# File 'lib/stepper_motor/journey.rb', line 402

def logger
  if (logger_from_parent = super)
    tag = [self.class.to_s, to_param].join(":")
    tag << " at " << @current_step_definition.name if @current_step_definition
    logger_from_parent.tagged(tag)
  else
    # Furnish a "null logger"
    ActiveSupport::Logger.new(nil)
  end
end

#lookup_step_definition(by_step_name) ⇒ Object

Alias for the class method, for brevity



208
209
210
# File 'lib/stepper_motor/journey.rb', line 208

def lookup_step_definition(by_step_name)
  self.class.lookup_step_definition(by_step_name)
end

#perform_next_step!(idempotency_key: nil) ⇒ void

This method returns an undefined value.

Performs the next step in the journey. Will check whether any other process has performed the step already and whether the record is unchanged, and will then lock it and set the state to 'performimg'.

After setting the state, it will determine the next step to perform, and perform it. Depending on the outcome of the step another PerformStepJob may get enqueued. If the journey ends here, the journey record will set its state to 'finished'.

Parameters:

  • idempotency_key (String, nil) (defaults to: nil)

    If provided, the step will only be performed if the idempotency key matches the current idempotency key. This ensures that the only the triggering job that was scheduled for this step can trigger the step and not any other.



257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
# File 'lib/stepper_motor/journey.rb', line 257

def perform_next_step!(idempotency_key: nil)
  # Make sure we can't start running the same step of the same journey twice
  next_step_name_before_locking = next_step_name
  with_lock do
    # Make sure no other worker has snatched this journey and made steps instead of us
    return unless ready? && next_step_name == next_step_name_before_locking
    # Check idempotency key if both are present
    return if idempotency_key && idempotency_key != self.idempotency_key

    performing!
    after_locking_for_step(next_step_name)
  end

  # Check cancel_if conditions after setting state to performing
  if cancel_if_conditions.any? { |conditional| conditional.satisfied_by?(self) }
    logger.info { "cancel_if condition satisfied, canceling journey" }
    cancel!
    return
  end

  current_step_name = next_step_name

  if current_step_name
    logger.debug { "preparing to perform step #{current_step_name}" }
  else
    logger.debug { "no next step - finishing journey" }
    # If there is no step set - just terminate the journey
    return finished! unless current_step_name
  end

  before_step_starts(current_step_name)

  # Recover the step definition
  @current_step_definition = lookup_step_definition(current_step_name)

  unless @current_step_definition
    logger.debug { "no definition for #{current_step_name} - finishing journey" }
    return finished!
  end

  # Is we tried to run the step but it is not yet time to do so,
  # enqueue a new job to perform it and stop
  if next_step_to_be_performed_at > Time.current
    logger.warn { "tried to perform #{current_step_name} prematurely" }
    schedule!
    return ready!
  end

  # Perform the actual step
  increment!(:steps_entered)
  logger.debug { "entering step #{current_step_name}" }

  # The flow control for reattempt! and cancel! happens inside perform_in_context_of
  ex_rescued_at_perform = nil
  begin
    @current_step_definition.perform_in_context_of(self)
  rescue => e
    ex_rescued_at_perform = e
    logger.debug { "#{e} raised during #{@current_step_definition.name}, will be re-raised after" }
  end

  # By the end of the step the Journey must either be untouched or saved
  if changed?
    raise StepperMotor::JourneyNotPersisted, <<~MSG
      #{self} had its attributes changed but was not saved inside step #{current_step_name.inspect}
      this means that the subsequent execution (which may be done asynchronously) is likely to see
      a stale Journey, and will execute incorrectly. If you mutate the Journey inside
      of a step, make sure to call `save!` or use methods that save in-place
      (such as `increment!`).
    MSG
  end

  if ex_rescued_at_perform
    logger.warn { "performed #{current_step_name}, #{ex_rescued_at_perform} was raised" }
  else
    increment!(:steps_completed)
    logger.debug { "performed #{current_step_name} without exceptions" }
  end

  if paused? || canceled?
    # The step made arrangements regarding how we shoudl continue, nothing to do
    logger.info { "has been #{state} inside #{current_step_name}" }
  elsif @reattempt_after
    # The step asked the actions to be attempted at a later time
    logger.info { "will reattempt #{current_step_name} in #{@reattempt_after} seconds" }
    set_next_step_and_enqueue(@current_step_definition, wait: @reattempt_after)
    ready!
  elsif @skip_current_step
    # The step asked to be skipped
    next_step_definition = self.class.step_definitions_following(@current_step_definition).first

    if next_step_definition
      # There are more steps after this one - schedule the next step
      logger.info { "skipping current step #{current_step_name}, will continue to #{next_step_definition.name}" }
      set_next_step_and_enqueue(next_step_definition)
      ready!
    else
      # This is the last step - finish the journey
      logger.info { "skipping current step #{current_step_name}, finishing journey" }
      finished!
      update!(previous_step_name: current_step_name, next_step_name: nil)
    end
  elsif finished?
    logger.info { "was marked finished inside the step" }
    update!(previous_step_name: current_step_name, next_step_name: nil)
  elsif (next_step_definition = self.class.step_definitions_following(@current_step_definition).first)
    logger.info { "will continue to #{next_step_definition.name}" }
    set_next_step_and_enqueue(next_step_definition)
    ready!
  else
    logger.info { "has finished" } # The hero's journey is complete
    finished!
    update!(previous_step_name: current_step_name, next_step_name: nil)
  end
ensure
  # The instance variables must not be present if `perform_next_step!` gets called
  # on this same object again. This will be the case if the steps are performed inline
  # and not via background jobs (which reload the model). This should actually be solved
  # using some object that contains the state of the action later, but for now - the dirty approach is fine.
  @reattempt_after = nil
  @skip_current_step = nil
  @current_step_definition = nil
  # Re-raise the exception, now that we have persisted the Journey according to the recovery policy
  if ex_rescued_at_perform
    after_performing_step_with_exception(current_step_name, ex_rescued_at_perform) if current_step_name
    raise ex_rescued_at_perform
  elsif current_step_name
    after_performing_step_without_exception(current_step_name)
  end
end

#schedule!Object



425
426
427
# File 'lib/stepper_motor/journey.rb', line 425

def schedule!
  StepperMotor.scheduler.schedule(self)
end

#set_next_step_and_enqueue(next_step_definition, wait: nil) ⇒ Object



395
396
397
398
399
400
# File 'lib/stepper_motor/journey.rb', line 395

def set_next_step_and_enqueue(next_step_definition, wait: nil)
  wait ||= next_step_definition.wait
  next_idempotency_key = SecureRandom.base36(16)
  update!(previous_step_name: next_step_name, next_step_name: next_step_definition.name, next_step_to_be_performed_at: Time.current + wait, idempotency_key: next_idempotency_key)
  schedule!
end

#step_definitionsObject

Alias for the class attribute, for brevity

See Also:



48
# File 'lib/stepper_motor/journey.rb', line 48

class_attribute :step_definitions, default: []

#time_remaining_until_final_stepActiveSupport::Duration

Returns:

  • (ActiveSupport::Duration)


389
390
391
392
393
# File 'lib/stepper_motor/journey.rb', line 389

def time_remaining_until_final_step
  subsequent_steps = @current_step_definition ? self.class.step_definitions_following(@current_step_definition) : step_definitions
  seconds_remaining = subsequent_steps.map { |definition| definition.wait.to_f }.sum
  seconds_remaining.seconds # Convert to ActiveSupport::Duration
end