Class: StepperMotor::Journey

Inherits:
ActiveRecord::Base
  • Object
show all
Defined in:
lib/stepper_motor/journey.rb

Overview

A Journey is the main building block of StepperMotor. You create a journey to guide a particular model (“hero”) through a sequence of steps. Any of your model can be the hero and have multiple Journeys. To create your own Journey, subclass the StepperMotor::Journey class and define your steps. For example, a drip mail campaign can look like this:

class ResubscribeCampaign < StepperMotor::Journey
  step do
    ReinviteMailer.with(recipient: hero).deliver_later
  end

  step, wait: 3.days do
    cancel! if hero.active?
    ReinviteMailer.with(recipient: hero).deliver_later
  end

  step, wait: 3.days do
    cancel! if hero.active?
    ReinviteMailer.with(recipient: hero).deliver_later
  end

  step, wait: 3.days do
    cancel! if hero.active?
    hero.close_account!
  end
end

Creating a record for the Journey (just using create!) will instantly send your hero on their way:

ResubscribeCampaign.create!(hero: )

To stop the journey forcibly, delete it from your database - or call cancel! within any of the steps.

Constant Summary collapse

STATES =
%w[ready performing canceled finished]

Class Method Summary collapse

Instance Method Summary collapse

Class Method Details

.lookup_step_definition(by_step_name) ⇒ StepperMotor::Step?

Returns the Step object for a named step. This is used when performing a step, but can also be useful in other contexts.

Parameters:

  • by_step_name (Symbol, String)

    the name of the step to find

Returns:



93
94
95
# File 'lib/stepper_motor/journey.rb', line 93

def self.lookup_step_definition(by_step_name)
  step_definitions.find { |d| d.name.to_s == by_step_name.to_s }
end

.step(name = nil, wait: nil, after: nil, &blk) ⇒ Object

Defines a step in the journey. Steps are stacked top to bottom and get performed in sequence.



61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
# File 'lib/stepper_motor/journey.rb', line 61

def self.step(name = nil, wait: nil, after: nil, &blk)
  wait = if wait && after
    raise StepConfigurationError, "Either wait: or after: can be specified, but not both"
  elsif !wait && !after
    0
  elsif after
    accumulated = step_definitions.map(&:wait).sum
    after - accumulated
  else
    wait
  end
  raise StepConfigurationError, "wait: cannot be negative, but computed was #{wait}s" if wait.negative?
  name ||= "step_%d" % (step_definitions.length + 1)
  name = name.to_s

  known_step_names = step_definitions.map(&:name)
  raise StepConfigurationError, "Step named #{name.inspect} already defined" if known_step_names.include?(name)

  # Create the step definition
  step_definition = StepperMotor::Step.new(name: name, wait: wait, seq: step_definitions.length, &blk)

  # As per Rails docs: you need to be aware when using class_attribute with mutable structures
  # as Array or Hash. In such cases, you don’t want to do changes in place. Instead use setters.
  # See https://apidock.com/rails/v7.1.3.2/Class/class_attribute
  self.step_definitions = step_definitions + [step_definition]
end

Instance Method Details

#after_locking_for_step(step_name) ⇒ Object



255
256
# File 'lib/stepper_motor/journey.rb', line 255

def after_locking_for_step(step_name)
end

#after_step_completes(step_name) ⇒ Object



261
262
# File 'lib/stepper_motor/journey.rb', line 261

def after_step_completes(step_name)
end

#before_step_starts(step_name) ⇒ Object



258
259
# File 'lib/stepper_motor/journey.rb', line 258

def before_step_starts(step_name)
end

#cancel!Object

Is a convenient way to end a hero’s journey. Imagine you enter a step where you are inviting a user to rejoin the platform, and are just about to send them an email - but they have already joined. You can therefore cancel their journey. Canceling bails you out of the step-defined block and sets the journey record to the canceled state.

Calling cancel! will abort the execution of the current step.



116
117
118
119
# File 'lib/stepper_motor/journey.rb', line 116

def cancel!
  canceled!
  throw :abort_step
end

#loggerObject



244
245
246
247
248
249
250
251
252
253
# File 'lib/stepper_motor/journey.rb', line 244

def logger
  if (logger_from_parent = super)
    tag = [self.class.to_s, to_param].join(":")
    tag << " at " << @current_step_definition.name if @current_step_definition
    logger_from_parent.tagged(tag)
  else
    # Furnish a "null logger"
    ActiveSupport::Logger.new(nil)
  end
end

#lookup_step_definition(by_step_name) ⇒ Object

Alias for the class method, for brevity



107
108
109
# File 'lib/stepper_motor/journey.rb', line 107

def lookup_step_definition(by_step_name)
  self.class.lookup_step_definition(by_step_name)
end

#perform_next_step!void

This method returns an undefined value.

Performs the next step in the journey. Will check whether any other process has performed the step already and whether the record is unchanged, and will then lock it and set the state to ‘performimg’.

After setting the state, it will determine the next step to perform, and perform it. Depending on the outcome of the step another PerformStepJob may get enqueued. If the journey ends here, the journey record will set its state to ‘finished’.



140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
# File 'lib/stepper_motor/journey.rb', line 140

def perform_next_step!
  # Make sure we can't start running the same step of the same journey twice
  next_step_name_before_locking = next_step_name
  with_lock do
    # Make sure no other worker has snatched this journey and made steps instead of us
    return unless ready? && next_step_name == next_step_name_before_locking
    performing!
    after_locking_for_step(next_step_name)
  end
  current_step_name = next_step_name

  if current_step_name
    logger.debug { "preparing to perform step #{current_step_name}" }
  else
    logger.debug { "no next step - finishing journey" }
    # If there is no step set - just terminate the journey
    return finished! unless current_step_name
  end

  before_step_starts(current_step_name)

  # Recover the step definition
  @current_step_definition = lookup_step_definition(current_step_name)

  unless @current_step_definition
    logger.debug { "no definition for #{current_step_name} - finishing journey" }
    return finished!
  end

  # Is we tried to run the step but it is not yet time to do so,
  # enqueue a new job to perform it and stop
  if next_step_to_be_performed_at > Time.current
    logger.warn { "tried to perform #{current_step_name} prematurely" }
    schedule!
    return ready!
  end

  # Perform the actual step
  increment!(:steps_entered)
  logger.debug { "entering step #{current_step_name}" }

  catch(:abort_step) do
    instance_exec(&@current_step_definition)
  end

  # By the end of the step the Journey must either be untouched or saved
  if changed?
    raise StepperMotor::JourneyNotPersisted, <<~MSG
      #{self} had its attributes changed but was not saved inside step #{current_step_name.inspect}
      this means that the subsequent execution (which may be done asynchronously) is likely to see
      a stale Journey, and will execute incorrectly. If you mutate the Journey inside
      of a step, make sure to call `save!` or use methods that save in-place
      (such as `increment!`).
    MSG
  end

  increment!(:steps_completed)
  logger.debug { "completed #{current_step_name} without exceptions" }

  if canceled?
    # The step aborted the journey, nothing to do
    logger.info { "has been canceled inside #{current_step_name}" }
  elsif @reattempt_after
    # The step asked the actions to be attempted at a later time
    logger.info { "will reattempt #{current_step_name} in #{@reattempt_after} seconds" }
    update!(previous_step_name: current_step_name, next_step_name: current_step_name, next_step_to_be_performed_at: Time.current + @reattempt_after)
    schedule!
    ready!
  elsif finished?
    logger.info { "was marked finished inside the step" }
    update!(previous_step_name: current_step_name, next_step_name: nil)
  elsif (next_step_definition = step_definitions[@current_step_definition.seq + 1])
    logger.info { "will continue to #{next_step_definition.name}" }
    set_next_step_and_enqueue(next_step_definition)
    ready!
  else
    # The hero's journey is complete
    logger.info { "journey completed" }
    finished!
    update!(previous_step_name: current_step_name, next_step_name: nil)
  end
ensure
  # The instance variables must not be present if `perform_next_step!` gets called
  # on this same object again. This will be the case if the steps are performed inline
  # and not via background jobs (which reload the model)
  @reattempt_after = nil
  @current_step_definition = nil
  after_step_completes(current_step_name) if current_step_name
end

#reattempt!(wait: nil) ⇒ Object

Inside a step it is possible to ask StepperMotor to retry to start the step at a later point in time. Maybe now is an inconvenient moment (are you about to send a push notification at 3AM perhaps?). The wait: parameter specifies how long to defer reattempting the step for. Reattempting will resume the step from the beginning, so the step should be idempotent.

Calling reattempt! will abort the execution of the current step.



126
127
128
129
130
# File 'lib/stepper_motor/journey.rb', line 126

def reattempt!(wait: nil)
  # The default `wait` is the one for the step definition
  @reattempt_after = wait || @current_step_definition.wait || 0
  throw :abort_step
end

#schedule!Object



264
265
266
# File 'lib/stepper_motor/journey.rb', line 264

def schedule!
  StepperMotor.scheduler.schedule(self)
end

#set_next_step_and_enqueue(next_step_definition) ⇒ Object



238
239
240
241
242
# File 'lib/stepper_motor/journey.rb', line 238

def set_next_step_and_enqueue(next_step_definition)
  wait = next_step_definition.wait
  update!(previous_step_name: next_step_name, next_step_name: next_step_definition.name, next_step_to_be_performed_at: Time.current + wait)
  schedule!
end

#step_definitionsObject

Alias for the class attribute, for brevity

See Also:



42
# File 'lib/stepper_motor/journey.rb', line 42

class_attribute :step_definitions, default: []

#time_remaining_until_final_stepActiveSupport::Duration

Returns:

  • (ActiveSupport::Duration)


231
232
233
234
235
236
# File 'lib/stepper_motor/journey.rb', line 231

def time_remaining_until_final_step
  current_step_seq = @current_step_definition&.seq || -1
  subsequent_steps = step_definitions.select { |definition| definition.seq > current_step_seq }
  seconds_remaining = subsequent_steps.map { |definition| definition.wait.to_f }.sum
  seconds_remaining.seconds # Convert to ActiveSupport::Duration
end

#to_global_idObject



268
269
270
271
272
# File 'lib/stepper_motor/journey.rb', line 268

def to_global_id
  # This gets included into ActiveModel during Rails bootstrap,
  # for now do this manually
  GlobalID.create(self, app: "stepper-motor")
end