Code Coverage |
||||||||||
Classes and Traits |
Functions and Methods |
Lines |
||||||||
| Total | |
0.00% |
0 / 1 |
|
14.29% |
2 / 14 |
CRAP | |
51.46% |
106 / 206 |
| MigrateExecutable | |
0.00% |
0 / 1 |
|
14.29% |
2 / 14 |
501.73 | |
51.46% |
106 / 206 |
| __construct | |
0.00% |
0 / 1 |
14.54 | |
38.10% |
8 / 21 |
|||
| getSource | |
100.00% |
1 / 1 |
2 | |
100.00% |
3 / 3 |
|||
| getEventDispatcher | |
0.00% |
0 / 1 |
2.15 | |
66.67% |
2 / 3 |
|||
| import | |
0.00% |
0 / 1 |
29.46 | |
64.94% |
50 / 77 |
|||
| rollback | |
0.00% |
0 / 1 |
72 | |
0.00% |
0 / 30 |
|||
| processRow | |
0.00% |
0 / 1 |
39.86 | |
45.83% |
11 / 24 |
|||
| currentSourceIds | |
0.00% |
0 / 1 |
2 | |
0.00% |
0 / 1 |
|||
| saveMessage | |
100.00% |
1 / 1 |
1 | |
100.00% |
2 / 2 |
|||
| handleException | |
0.00% |
0 / 1 |
6 | |
0.00% |
0 / 6 |
|||
| checkStatus | |
0.00% |
0 / 1 |
2.15 | |
66.67% |
2 / 3 |
|||
| memoryExceeded | |
0.00% |
0 / 1 |
4 | |
96.55% |
28 / 29 |
|||
| getMemoryUsage | |
0.00% |
0 / 1 |
2 | |
0.00% |
0 / 1 |
|||
| attemptMemoryReclaim | |
0.00% |
0 / 1 |
6 | |
0.00% |
0 / 5 |
|||
| formatSize | |
0.00% |
0 / 1 |
2 | |
0.00% |
0 / 1 |
|||
| <?php | |
| /** | |
| * @file | |
| * Contains \Drupal\migrate\MigrateExecutable. | |
| */ | |
| namespace Drupal\migrate; | |
| use Drupal\Core\Utility\Error; | |
| use Drupal\Core\StringTranslation\StringTranslationTrait; | |
| use Drupal\migrate\Entity\MigrationInterface; | |
| use Drupal\migrate\Event\MigrateEvents; | |
| use Drupal\migrate\Event\MigrateImportEvent; | |
| use Drupal\migrate\Event\MigratePostRowSaveEvent; | |
| use Drupal\migrate\Event\MigratePreRowSaveEvent; | |
| use Drupal\migrate\Event\MigrateRollbackEvent; | |
| use Drupal\migrate\Event\MigrateRowDeleteEvent; | |
| use Drupal\migrate\Exception\RequirementsException; | |
| use Drupal\migrate\Plugin\MigrateIdMapInterface; | |
| use Symfony\Component\EventDispatcher\EventDispatcherInterface; | |
| /** | |
| * Defines a migrate executable class. | |
| */ | |
| class MigrateExecutable implements MigrateExecutableInterface { | |
| use StringTranslationTrait; | |
| /** | |
| * The configuration of the migration to do. | |
| * | |
| * @var \Drupal\migrate\Entity\Migration | |
| */ | |
| protected $migration; | |
| /** | |
| * Status of one row. | |
| * | |
| * The value is a MigrateIdMapInterface::STATUS_* constant, for example: | |
| * STATUS_IMPORTED. | |
| * | |
| * @var int | |
| */ | |
| protected $sourceRowStatus; | |
| /** | |
| * The ratio of the memory limit at which an operation will be interrupted. | |
| * | |
| * @var float | |
| */ | |
| protected $memoryThreshold = 0.85; | |
| /** | |
| * The PHP memory_limit expressed in bytes. | |
| * | |
| * @var int | |
| */ | |
| protected $memoryLimit; | |
| /** | |
| * The configuration values of the source. | |
| * | |
| * @var array | |
| */ | |
| protected $sourceIdValues; | |
| /** | |
| * An array of counts. Initially used for cache hit/miss tracking. | |
| * | |
| * @var array | |
| */ | |
| protected $counts = array(); | |
| /** | |
| * The object currently being constructed. | |
| * | |
| * @var \stdClass | |
| */ | |
| protected $destinationValues; | |
| /** | |
| * The source. | |
| * | |
| * @var \Drupal\migrate\Plugin\MigrateSourceInterface | |
| */ | |
| protected $source; | |
| /** | |
| * The current data row retrieved from the source. | |
| * | |
| * @var \stdClass | |
| */ | |
| protected $sourceValues; | |
| /** | |
| * The event dispatcher. | |
| * | |
| * @var \Symfony\Component\EventDispatcher\EventDispatcherInterface | |
| */ | |
| protected $eventDispatcher; | |
| /** | |
| * Constructs a MigrateExecutable and verifies and sets the memory limit. | |
| * | |
| * @param \Drupal\migrate\Entity\MigrationInterface $migration | |
| * The migration to run. | |
| * @param \Drupal\migrate\MigrateMessageInterface $message | |
| * The message to record. | |
| * @param \Symfony\Component\EventDispatcher\EventDispatcherInterface $event_dispatcher | |
| * The event dispatcher. | |
| * | |
| * @throws \Drupal\migrate\MigrateException | |
| */ | |
| public function __construct(MigrationInterface $migration, MigrateMessageInterface $message, EventDispatcherInterface $event_dispatcher = NULL) { | |
| $this->migration = $migration; | |
| $this->message = $message; | |
| $this->migration->getIdMap()->setMessage($message); | |
| $this->eventDispatcher = $event_dispatcher; | |
| // Record the memory limit in bytes | |
| $limit = trim(ini_get('memory_limit')); | |
| if ($limit == '-1') { | |
| $this->memoryLimit = PHP_INT_MAX; | |
| } | |
| else { | |
| if (!is_numeric($limit)) { | |
| $last = strtolower(substr($limit, -1)); | |
| switch ($last) { | |
| case 'g': | |
| $limit *= 1024; | |
| case 'm': | |
| $limit *= 1024; | |
| case 'k': | |
| $limit *= 1024; | |
| break; | |
| default: | |
| $limit = PHP_INT_MAX; | |
| $this->message->display($this->t('Invalid PHP memory_limit @limit, setting to unlimited.', | |
| array('@limit' => $limit))); | |
| } | |
| } | |
| $this->memoryLimit = $limit; | |
| } | |
| } | |
| /** | |
| * Returns the source. | |
| * | |
| * Makes sure source is initialized based on migration settings. | |
| * | |
| * @return \Drupal\migrate\Plugin\MigrateSourceInterface | |
| * The source. | |
| */ | |
| protected function getSource() { | |
| if (!isset($this->source)) { | |
| $this->source = $this->migration->getSourcePlugin(); | |
| } | |
| return $this->source; | |
| } | |
| /** | |
| * Gets the event dispatcher. | |
| * | |
| * @return \Symfony\Component\EventDispatcher\EventDispatcherInterface | |
| */ | |
| protected function getEventDispatcher() { | |
| if (!$this->eventDispatcher) { | |
| $this->eventDispatcher = \Drupal::service('event_dispatcher'); | |
| } | |
| return $this->eventDispatcher; | |
| } | |
| /** | |
| * {@inheritdoc} | |
| */ | |
| public function import() { | |
| // Only begin the import operation if the migration is currently idle. | |
| if ($this->migration->getStatus() !== MigrationInterface::STATUS_IDLE) { | |
| $this->message->display($this->t('Migration @id is busy with another operation: @status', | |
| array( | |
| '@id' => $this->migration->id(), | |
| '@status' => $this->t($this->migration->getStatusLabel()), | |
| )), 'error'); | |
| return MigrationInterface::RESULT_FAILED; | |
| } | |
| $this->getEventDispatcher()->dispatch(MigrateEvents::PRE_IMPORT, new MigrateImportEvent($this->migration)); | |
| // Knock off migration if the requirements haven't been met. | |
| try { | |
| $this->migration->checkRequirements(); | |
| } | |
| catch (RequirementsException $e) { | |
| $this->message->display( | |
| $this->t('Migration @id did not meet the requirements. @message @requirements', array( | |
| '@id' => $this->migration->id(), | |
| '@message' => $e->getMessage(), | |
| '@requirements' => $e->getRequirementsString(), | |
| )), 'error'); | |
| return MigrationInterface::RESULT_FAILED; | |
| } | |
| $this->migration->setStatus(MigrationInterface::STATUS_IMPORTING); | |
| $return = MigrationInterface::RESULT_COMPLETED; | |
| $source = $this->getSource(); | |
| $id_map = $this->migration->getIdMap(); | |
| try { | |
| $source->rewind(); | |
| } | |
| catch (\Exception $e) { | |
| $this->message->display( | |
| $this->t('Migration failed with source plugin exception: @e', array('@e' => $e->getMessage())), 'error'); | |
| $this->migration->setStatus(MigrationInterface::STATUS_IDLE); | |
| return MigrationInterface::RESULT_FAILED; | |
| } | |
| $destination = $this->migration->getDestinationPlugin(); | |
| while ($source->valid()) { | |
| $row = $source->current(); | |
| $this->sourceIdValues = $row->getSourceIdValues(); | |
| try { | |
| $this->processRow($row); | |
| $save = TRUE; | |
| } | |
| catch (MigrateException $e) { | |
| $this->migration->getIdMap()->saveIdMapping($row, array(), $e->getStatus()); | |
| $this->saveMessage($e->getMessage(), $e->getLevel()); | |
| $save = FALSE; | |
| } | |
| catch (MigrateSkipRowException $e) { | |
| $id_map->saveIdMapping($row, array(), MigrateIdMapInterface::STATUS_IGNORED); | |
| $save = FALSE; | |
| } | |
| if ($save) { | |
| try { | |
| $this->getEventDispatcher()->dispatch(MigrateEvents::PRE_ROW_SAVE, new MigratePreRowSaveEvent($this->migration, $row)); | |
| $destination_id_values = $destination->import($row, $id_map->lookupDestinationId($this->sourceIdValues)); | |
| $this->getEventDispatcher()->dispatch(MigrateEvents::POST_ROW_SAVE, new MigratePostRowSaveEvent($this->migration, $row, $destination_id_values)); | |
| if ($destination_id_values) { | |
| // We do not save an idMap entry for config. | |
| if ($destination_id_values !== TRUE) { | |
| $id_map->saveIdMapping($row, $destination_id_values, $this->sourceRowStatus, $destination->rollbackAction()); | |
| } | |
| } | |
| else { | |
| $id_map->saveIdMapping($row, array(), MigrateIdMapInterface::STATUS_FAILED); | |
| if (!$id_map->messageCount()) { | |
| $message = $this->t('New object was not saved, no error provided'); | |
| $this->saveMessage($message); | |
| $this->message->display($message); | |
| } | |
| } | |
| } | |
| catch (MigrateException $e) { | |
| $this->migration->getIdMap()->saveIdMapping($row, array(), $e->getStatus()); | |
| $this->saveMessage($e->getMessage(), $e->getLevel()); | |
| } | |
| catch (\Exception $e) { | |
| $this->migration->getIdMap()->saveIdMapping($row, array(), MigrateIdMapInterface::STATUS_FAILED); | |
| $this->handleException($e); | |
| } | |
| } | |
| if ($high_water_property = $this->migration->get('highWaterProperty')) { | |
| $this->migration->saveHighWater($row->getSourceProperty($high_water_property['name'])); | |
| } | |
| // Reset row properties. | |
| unset($sourceValues, $destinationValues); | |
| $this->sourceRowStatus = MigrateIdMapInterface::STATUS_IMPORTED; | |
| // Check for memory exhaustion. | |
| if (($return = $this->checkStatus()) != MigrationInterface::RESULT_COMPLETED) { | |
| break; | |
| } | |
| // If anyone has requested we stop, return the requested result. | |
| if ($this->migration->getStatus() == MigrationInterface::STATUS_STOPPING) { | |
| $return = $this->migration->getInterruptionResult(); | |
| $this->migration->clearInterruptionResult(); | |
| break; | |
| } | |
| try { | |
| $source->next(); | |
| } | |
| catch (\Exception $e) { | |
| $this->message->display( | |
| $this->t('Migration failed with source plugin exception: @e', | |
| array('@e' => $e->getMessage())), 'error'); | |
| $this->migration->setStatus(MigrationInterface::STATUS_IDLE); | |
| return MigrationInterface::RESULT_FAILED; | |
| } | |
| } | |
| $this->getEventDispatcher()->dispatch(MigrateEvents::POST_IMPORT, new MigrateImportEvent($this->migration)); | |
| $this->migration->setStatus(MigrationInterface::STATUS_IDLE); | |
| return $return; | |
| } | |
| /** | |
| * {@inheritdoc} | |
| */ | |
| public function rollback() { | |
| // Only begin the rollback operation if the migration is currently idle. | |
| if ($this->migration->getStatus() !== MigrationInterface::STATUS_IDLE) { | |
| $this->message->display($this->t('Migration @id is busy with another operation: @status', ['@id' => $this->migration->id(), '@status' => $this->t($this->migration->getStatusLabel())]), 'error'); | |
| return MigrationInterface::RESULT_FAILED; | |
| } | |
| // Announce that rollback is about to happen. | |
| $this->getEventDispatcher()->dispatch(MigrateEvents::PRE_ROLLBACK, new MigrateRollbackEvent($this->migration)); | |
| // Optimistically assume things are going to work out; if not, $return will be | |
| // updated to some other status. | |
| $return = MigrationInterface::RESULT_COMPLETED; | |
| $this->migration->setStatus(MigrationInterface::STATUS_ROLLING_BACK); | |
| $id_map = $this->migration->getIdMap(); | |
| $destination = $this->migration->getDestinationPlugin(); | |
| // Loop through each row in the map, and try to roll it back. | |
| foreach ($id_map as $map_row) { | |
| $destination_key = $id_map->currentDestination(); | |
| if ($destination_key) { | |
| $map_row = $id_map->getRowByDestination($destination_key); | |
| if ($map_row['rollback_action'] == MigrateIdMapInterface::ROLLBACK_DELETE) { | |
| $this->getEventDispatcher() | |
| ->dispatch(MigrateEvents::PRE_ROW_DELETE, new MigrateRowDeleteEvent($this->migration, $destination_key)); | |
| $destination->rollback($destination_key); | |
| $this->getEventDispatcher() | |
| ->dispatch(MigrateEvents::POST_ROW_DELETE, new MigrateRowDeleteEvent($this->migration, $destination_key)); | |
| } | |
| // We're now done with this row, so remove it from the map. | |
| $id_map->deleteDestination($destination_key); | |
| } | |
| // Check for memory exhaustion. | |
| if (($return = $this->checkStatus()) != MigrationInterface::RESULT_COMPLETED) { | |
| break; | |
| } | |
| // If anyone has requested we stop, return the requested result. | |
| if ($this->migration->getStatus() == MigrationInterface::STATUS_STOPPING) { | |
| $return = $this->migration->getInterruptionResult(); | |
| $this->migration->clearInterruptionResult(); | |
| break; | |
| } | |
| } | |
| // If rollback completed successfully, reset the high water mark. | |
| if ($return == MigrationInterface::RESULT_COMPLETED) { | |
| $this->migration->saveHighWater(NULL); | |
| } | |
| // Notify modules that rollback attempt was complete. | |
| $this->getEventDispatcher()->dispatch(MigrateEvents::POST_ROLLBACK, new MigrateRollbackEvent($this->migration)); | |
| $this->migration->setStatus(MigrationInterface::STATUS_IDLE); | |
| return $return; | |
| } | |
| /** | |
| * {@inheritdoc} | |
| */ | |
| public function processRow(Row $row, array $process = NULL, $value = NULL) { | |
| foreach ($this->migration->getProcessPlugins($process) as $destination => $plugins) { | |
| $multiple = FALSE; | |
| /** @var $plugin \Drupal\migrate\Plugin\MigrateProcessInterface */ | |
| foreach ($plugins as $plugin) { | |
| $definition = $plugin->getPluginDefinition(); | |
| // Many plugins expect a scalar value but the current value of the | |
| // pipeline might be multiple scalars (this is set by the previous | |
| // plugin) and in this case the current value needs to be iterated | |
| // and each scalar separately transformed. | |
| if ($multiple && !$definition['handle_multiples']) { | |
| $new_value = array(); | |
| if (!is_array($value)) { | |
| throw new MigrateException(sprintf('Pipeline failed for destination %s: %s got instead of an array,', $destination, $value)); | |
| } | |
| $break = FALSE; | |
| foreach ($value as $scalar_value) { | |
| try { | |
| $new_value[] = $plugin->transform($scalar_value, $this, $row, $destination); | |
| } | |
| catch (MigrateSkipProcessException $e) { | |
| $break = TRUE; | |
| } | |
| } | |
| $value = $new_value; | |
| if ($break) { | |
| break; | |
| } | |
| } | |
| else { | |
| try { | |
| $value = $plugin->transform($value, $this, $row, $destination); | |
| } | |
| catch (MigrateSkipProcessException $e) { | |
| break; | |
| } | |
| $multiple = $multiple || $plugin->multiple(); | |
| } | |
| } | |
| // No plugins or no value means do not set. | |
| if ($plugins && !is_null($value)) { | |
| $row->setDestinationProperty($destination, $value); | |
| } | |
| // Reset the value. | |
| $value = NULL; | |
| } | |
| } | |
| /** | |
| * Fetches the key array for the current source record. | |
| * | |
| * @return array | |
| * The current source IDs. | |
| */ | |
| protected function currentSourceIds() { | |
| return $this->getSource()->getCurrentIds(); | |
| } | |
| /** | |
| * {@inheritdoc} | |
| */ | |
| public function saveMessage($message, $level = MigrationInterface::MESSAGE_ERROR) { | |
| $this->migration->getIdMap()->saveMessage($this->sourceIdValues, $message, $level); | |
| } | |
| /** | |
| * Takes an Exception object and both saves and displays it. | |
| * | |
| * Pulls in additional information on the location triggering the exception. | |
| * | |
| * @param \Exception $exception | |
| * Object representing the exception. | |
| * @param bool $save | |
| * (optional) Whether to save the message in the migration's mapping table. | |
| * Set to FALSE in contexts where this doesn't make sense. | |
| */ | |
| protected function handleException(\Exception $exception, $save = TRUE) { | |
| $result = Error::decodeException($exception); | |
| $message = $result['@message'] . ' (' . $result['%file'] . ':' . $result['%line'] . ')'; | |
| if ($save) { | |
| $this->saveMessage($message); | |
| } | |
| $this->message->display($message, 'error'); | |
| } | |
| /** | |
| * Checks for exceptional conditions, and display feedback. | |
| */ | |
| protected function checkStatus() { | |
| if ($this->memoryExceeded()) { | |
| return MigrationInterface::RESULT_INCOMPLETE; | |
| } | |
| return MigrationInterface::RESULT_COMPLETED; | |
| } | |
| /** | |
| * Tests whether we've exceeded the desired memory threshold. | |
| * | |
| * If so, output a message. | |
| * | |
| * @return bool | |
| * TRUE if the threshold is exceeded, otherwise FALSE. | |
| */ | |
| protected function memoryExceeded() { | |
| $usage = $this->getMemoryUsage(); | |
| $pct_memory = $usage / $this->memoryLimit; | |
| if (!$threshold = $this->memoryThreshold) { | |
| return FALSE; | |
| } | |
| if ($pct_memory > $threshold) { | |
| $this->message->display( | |
| $this->t('Memory usage is @usage (@pct% of limit @limit), reclaiming memory.', | |
| array('@pct' => round($pct_memory*100), | |
| '@usage' => $this->formatSize($usage), | |
| '@limit' => $this->formatSize($this->memoryLimit))), | |
| 'warning'); | |
| $usage = $this->attemptMemoryReclaim(); | |
| $pct_memory = $usage / $this->memoryLimit; | |
| // Use a lower threshold - we don't want to be in a situation where we keep | |
| // coming back here and trimming a tiny amount | |
| if ($pct_memory > (0.90 * $threshold)) { | |
| $this->message->display( | |
| $this->t('Memory usage is now @usage (@pct% of limit @limit), not enough reclaimed, starting new batch', | |
| array('@pct' => round($pct_memory*100), | |
| '@usage' => $this->formatSize($usage), | |
| '@limit' => $this->formatSize($this->memoryLimit))), | |
| 'warning'); | |
| return TRUE; | |
| } | |
| else { | |
| $this->message->display( | |
| $this->t('Memory usage is now @usage (@pct% of limit @limit), reclaimed enough, continuing', | |
| array('@pct' => round($pct_memory*100), | |
| '@usage' => $this->formatSize($usage), | |
| '@limit' => $this->formatSize($this->memoryLimit))), | |
| 'warning'); | |
| return FALSE; | |
| } | |
| } | |
| else { | |
| return FALSE; | |
| } | |
| } | |
| /** | |
| * Returns the memory usage so far. | |
| * | |
| * @return int | |
| * The memory usage. | |
| */ | |
| protected function getMemoryUsage() { | |
| return memory_get_usage(); | |
| } | |
| /** | |
| * Tries to reclaim memory. | |
| * | |
| * @return int | |
| * The memory usage after reclaim. | |
| */ | |
| protected function attemptMemoryReclaim() { | |
| // First, try resetting Drupal's static storage - this frequently releases | |
| // plenty of memory to continue. | |
| drupal_static_reset(); | |
| // Entity storage can blow up with caches so clear them out. | |
| $manager = \Drupal::entityManager(); | |
| foreach ($manager->getDefinitions() as $id => $definition) { | |
| $manager->getStorage($id)->resetCache(); | |
| } | |
| // @TODO: explore resetting the container. | |
| return memory_get_usage(); | |
| } | |
| /** | |
| * Generates a string representation for the given byte count. | |
| * | |
| * @param int $size | |
| * A size in bytes. | |
| * | |
| * @return string | |
| * A translated string representation of the size. | |
| */ | |
| protected function formatSize($size) { | |
| return format_size($size); | |
| } | |
| } |