PoshCode Logo PowerShell Code Repository

Split-Job V 1.2 by Stephen Mills 25 months ago (modification of post by Arnoud Jansveld view diff)
View followups from Kerry Boomsliter | diff | embed code: <script type="text/javascript" src="http://PoshCode.org/embed/2621"></script>download | new post

The Split-Job function provides easy multithreading at the command line or in a script. It was created from a system administrator’s point of view and is compatible with PS v1. Supports importing functions, variables and snapins. For history and background please visit http://www.jansveld.net/powershell.
This is Version 1.2 and only work in PowerShell V 2. It supports powershell_ise and will cancel pipelines if escape is pressed. It also includes various other minor improvements.

  1. #requires -version 2.0
  2. <#
  3. ################################################################################
  4. ## Run commands in multiple concurrent pipelines
  5. ##   by Arnoud Jansveld - www.jansveld.net/powershell
  6. ##
  7. ## Basic "drop in" usage examples:
  8. ##   - Functions that accept pipelined input:
  9. ##       Without Split-Job:
  10. ##          Get-Content hosts.txt | MyFunction | Export-Csv results.csv
  11. ##       With Split-Job:
  12. ##          Get-Content hosts.txt | Split-Job {MyFunction} | Export-Csv results.csv
  13. ##   - Functions that do not accept pipelined input (use foreach):
  14. ##       Without Split-Job:
  15. ##          Get-Content hosts.txt |% { .\MyScript.ps1 -ComputerName $_ } | Export-Csv results.csv
  16. ##       With Split-Job:
  17. ##          Get-Content hosts.txt | Split-Job {%{ .\MyScript.ps1 -ComputerName $_ }} | Export-Csv results.csv
  18. ##
  19. ## Example with an imported function:
  20. ##       function Test-WebServer ($ComputerName) {
  21. ##           $WebRequest = [System.Net.WebRequest]::Create("http://$ComputerName")
  22. ##           $WebRequest.GetResponse()
  23. ##       }
  24. ##       Get-Content hosts.txt | Split-Job {%{Test-WebServer $_ }} -Function Test-WebServer | Export-Csv results.csv
  25. ##
  26. ## Example with importing a module
  27. ##       Get-Content Clusters.txt | Split-Job { % { Get-Cluster -Name $_ } } -InitializeScript { Import-Module FailoverClusters }
  28. ##     
  29. ##
  30. ## Version History
  31. ## 1.2    Changes by Stephen Mills - stephenmills at hotmail dot com
  32. ##        Only works with PowerShell V2
  33. ##        Modified error output to use ErrorRecord parameter of Write-Error - catches Category Info then.
  34. ##        Works correctly in powershell_ise.  Previous version would let pipelines continue if ESC was pressed.  If Escape pressed, then it will do an async cancel of the pipelines and exit.
  35. ##        Add seconds remaining to progress bar
  36. ##        Parameters Added and related functionality:
  37. ##           InitializeScript - allows to have custom scripts to initilize ( Import-Module ...), parameter might be renamed Begin in the future.
  38. ##           MaxDuration - Cancel all pending and in process items in queue if the number of seconds is reached before all input is done.
  39. ##           ProgressInfo - Allows you to add additional text to progress bar
  40. ##           NoProgress - Hide Progress Bar
  41. ##           DisplayInterval - frequency to update Progress bar in milliseconds
  42. ##           InputObject - not yet used, planned to be used in future to support start processing the queue before pipeline isn't finished yet
  43. ##        Added example for importing a module.
  44. ## 1.0    First version posted on poshcode.org
  45. ##        Additional runspace error checking and cleanup
  46. ## 0.93   Improve error handling: errors originating in the Scriptblock now
  47. ##        have more meaningful output
  48. ##        Show additional info in the progress bar (thanks Stephen Mills)
  49. ##        Add SnapIn parameter: imports (registered) PowerShell snapins
  50. ##        Add Function parameter: imports functions
  51. ##        Add SplitJobRunSpace variable; allows scripts to test if they are
  52. ##        running in a runspace
  53. ## 0.92   Add UseProfile switch: imports the PS profile
  54. ##        Add Variable parameter: imports variables
  55. ##        Add Alias parameter: imports aliases
  56. ##        Restart pipeline if it stops due to an error
  57. ##        Set the current path in each runspace to that of the calling process
  58. ## 0.91   Revert to v 0.8 input syntax for the script block
  59. ##        Add error handling for empty input queue
  60. ## 0.9    Add logic to distinguish between scriptblocks and cmdlets or scripts:
  61. ##        if a ScriptBlock is specified, a foreach {} wrapper is added
  62. ## 0.8    Adds a progress bar
  63. ## 0.7    Stop adding runspaces if the queue is already empty
  64. ## 0.6    First version. Inspired by Gaurhoth's New-TaskPool script
  65. ################################################################################
  66. #>
  67.  
  68. function Split-Job
  69. {
  70.         param (
  71.                 [Parameter(Position=0, Mandatory=$true)]$Scriptblock,
  72.                 [Parameter()][int]$MaxPipelines=10,
  73.                 [Parameter()][switch]$UseProfile,
  74.                 [Parameter()][string[]]$Variable,
  75.                 [Parameter()][string[]]$Function = @(),
  76.                 [Parameter()][string[]]$Alias = @(),
  77.                 [Parameter()][string[]]$SnapIn,
  78.                 [Parameter()][float]$MaxDuration = $( [Int]::MaxValue ),
  79.                 [Parameter()][string]$ProgressInfo ='',
  80.                 [Parameter()][int]$ProgressID = 0,
  81.                 [Parameter()][switch]$NoProgress,
  82.                 [Parameter()][int]$DisplayInterval = 300,
  83.                 [Parameter()][scriptblock]$InitializeScript,
  84.                 [Parameter(ValueFromPipeline=$true)][object[]]$InputObject
  85.         )
  86.  
  87.         begin
  88.         {
  89.                 $StartTime = Get-Date
  90.                 #$DisplayTime = $StartTime.AddMilliseconds( - $DisplayInterval )
  91.                 $ExitForced = $false
  92.  
  93.  
  94.                  function Init ($InputQueue){
  95.                         # Create the shared thread-safe queue and fill it with the input objects
  96.                         $Queue = [Collections.Queue]::Synchronized([Collections.Queue]@($InputQueue))
  97.                         $QueueLength = $Queue.Count
  98.                         # Do not create more runspaces than input objects
  99.                         if ($MaxPipelines -gt $QueueLength) {$MaxPipelines = $QueueLength}
  100.                         # Create the script to be run by each runspace
  101.                         $Script  = "Set-Location '$PWD'; "
  102.                         $Script += {
  103.                                 $SplitJobQueue = $($Input)
  104.                                 & {
  105.                                         trap {continue}
  106.                                         while ($SplitJobQueue.Count) {$SplitJobQueue.Dequeue()}
  107.                                 } |
  108.                         }.ToString() + $Scriptblock
  109.  
  110.                         # Create an array to keep track of the set of pipelines
  111.                         $Pipelines = New-Object System.Collections.ArrayList
  112.  
  113.                         # Collect the functions and aliases to import
  114.                         $ImportItems = ($Function -replace '^','Function:') +
  115.                                 ($Alias -replace '^','Alias:') |
  116.                                 Get-Item | select PSPath, Definition
  117.                         $stopwatch = [System.Diagnostics.Stopwatch]::StartNew()
  118.                 }
  119.  
  120.                 function Add-Pipeline {
  121.                         # This creates a new runspace and starts an asynchronous pipeline with our script.
  122.                         # It will automatically start processing objects from the shared queue.
  123.                         $Runspace = [System.Management.Automation.Runspaces.RunspaceFactory]::CreateRunspace($Host)
  124.                         $Runspace.Open()
  125.                         if (!$?) { throw "Could not open runspace!" }
  126.                         $Runspace.SessionStateProxy.SetVariable('SplitJobRunSpace', $True)
  127.  
  128.                         function CreatePipeline
  129.                         {
  130.                                 param ($Data, $Scriptblock)
  131.                                 $Pipeline = $Runspace.CreatePipeline($Scriptblock)
  132.                                 if ($Data)
  133.                                 {
  134.                                         $Null = $Pipeline.Input.Write($Data, $True)
  135.                                         $Pipeline.Input.Close()
  136.                                 }
  137.                                 $Null = $Pipeline.Invoke()
  138.                                 $Pipeline.Dispose()
  139.                         }
  140.  
  141.                         # Optionally import profile, variables, functions and aliases from the main runspace
  142.                        
  143.                         if ($UseProfile)
  144.                         {
  145.                                 CreatePipeline -Script "`$PROFILE = '$PROFILE'; . `$PROFILE"
  146.                         }
  147.  
  148.                         if ($Variable)
  149.                         {
  150.                                 foreach ($var in (Get-Variable $Variable -Scope 2))
  151.                                 {
  152.                                         trap {continue}
  153.                                         $Runspace.SessionStateProxy.SetVariable($var.Name, $var.Value)
  154.                                 }
  155.                         }
  156.                         if ($ImportItems)
  157.                         {
  158.                                 CreatePipeline $ImportItems {
  159.                                         foreach ($item in $Input) {New-Item -Path $item.PSPath -Value $item.Definition}
  160.                                 }
  161.                         }
  162.                         if ($SnapIn)
  163.                         {
  164.                                 CreatePipeline (Get-PSSnapin $Snapin -Registered) {$Input | Add-PSSnapin}
  165.                         }
  166.                        
  167.                         #Custom Initialization Script for startup of Pipeline - needs to be after other other items added.
  168.                         if ($InitializeScript -ne $null)
  169.                         {
  170.                                 CreatePipeline -Scriptblock $InitializeScript
  171.                         }
  172.  
  173.                         $Pipeline = $Runspace.CreatePipeline($Script)
  174.                         $Null = $Pipeline.Input.Write($Queue)
  175.                         $Pipeline.Input.Close()
  176.                         $Pipeline.InvokeAsync()
  177.                         $Null = $Pipelines.Add($Pipeline)
  178.                 }
  179.  
  180.                 function Remove-Pipeline ($Pipeline)
  181.                 {
  182.                         # Remove a pipeline and runspace when it is done
  183.                         $Pipeline.RunSpace.CloseAsync()
  184.                         #Removed Dispose so that Split-Job can be quickly aborted even if currently running something waiting for a timeout.
  185.                         #Added call to [System.GC]::Collect() at end of script to free up what memory it can.
  186.                         #$Pipeline.Dispose()
  187.                         $Pipelines.Remove($Pipeline)
  188.                 }
  189.         }
  190.  
  191.         end
  192.         {
  193.                
  194.  
  195.  
  196.                 # Main
  197.                 # Initialize the queue from the pipeline
  198.                 . Init $Input
  199.                 # Start the pipelines
  200.                 try
  201.                 {
  202.                         while ($Pipelines.Count -lt $MaxPipelines -and $Queue.Count) {Add-Pipeline}
  203.  
  204.                         # Loop through the pipelines and pass their output to the pipeline until they are finished
  205.                         while ($Pipelines.Count)
  206.                         {
  207.                                 # Only update the progress bar once per $DisplayInterval
  208.                                 if (-not $NoProgress -and $Stopwatch.ElapsedMilliseconds -ge $DisplayInterval)
  209.                                 {
  210.                                         $Completed = $QueueLength - $Queue.Count - $Pipelines.count
  211.                                         $Stopwatch.Reset()
  212.                                         $Stopwatch.Start()
  213.                                         #$LastUpdate = $stopwatch.ElapsedMilliseconds
  214.                                         $PercentComplete = (100 - [Int]($Queue.Count)/$QueueLength*100)
  215.                                         $Duration = (Get-Date) - $StartTime
  216.                                         $DurationString = [timespan]::FromSeconds( [Math]::Floor($Duration.TotalSeconds)).ToString()
  217.                                         $ItemsPerSecond = $Completed / $Duration.TotalSeconds
  218.                                         $SecondsRemaining = [math]::Round(($QueueLength - $Completed)/ ( .{ if ($ItemsPerSecond -eq 0 ) { 0.001 } else { $ItemsPerSecond}}))
  219.                                        
  220.                                         Write-Progress -Activity "** Split-Job **  *Press Esc to exit*  Next item: $(trap {continue}; if ($Queue.Count) {$Queue.Peek()})" `
  221.                                                 -status "Queues: $($Pipelines.Count) QueueLength: $($QueueLength) StartTime: $($StartTime)  $($ProgressInfo)" `
  222.                                                 -currentOperation  "$( . { if ($ExitForced) { 'Aborting Job!   ' }})Completed: $($Completed) Pending: $($QueueLength- ($QueueLength-($Queue.Count + $Pipelines.Count))) RunTime: $($DurationString) ItemsPerSecond: $([math]::round($ItemsPerSecond, 3))"`
  223.                                                 -PercentComplete $PercentComplete `
  224.                                                 -Id $ProgressID `
  225.                                                 -SecondsRemaining $SecondsRemaining
  226.                                 }      
  227.                                 foreach ($Pipeline in @($Pipelines))
  228.                                 {
  229.                                         if ( -not $Pipeline.Output.EndOfPipeline -or -not $Pipeline.Error.EndOfPipeline)
  230.                                         {
  231.                                                 $Pipeline.Output.NonBlockingRead()
  232.                                                 $Pipeline.Error.NonBlockingRead() | % { Write-Error -ErrorRecord $_ }
  233.  
  234.                                         } else
  235.                                         {
  236.                                                 # Pipeline has stopped; if there was an error show info and restart it                 
  237.                                                 if ($Pipeline.PipelineStateInfo.State -eq 'Failed')
  238.                                                 {
  239.                                                         Write-Error $Pipeline.PipelineStateInfo.Reason
  240.                                                        
  241.                                                         # Restart the runspace
  242.                                                         if ($Queue.Count -lt $QueueLength) {Add-Pipeline}
  243.                                                 }
  244.                                                 Remove-Pipeline $Pipeline
  245.                                         }
  246.                                         if ( ((Get-Date) - $StartTime).TotalSeconds -ge $MaxDuration -and -not $ExitForced)
  247.                                         {
  248.                                                 Write-Warning "Aborting job! The MaxDuration of $MaxDuration seconds has been reached. Inputs that have not been processed will be skipped."
  249.                                                 $ExitForced=$true
  250.                                         }
  251.                                        
  252.                                         if ($ExitForced) { $Pipeline.StopAsync(); Remove-Pipeline $Pipeline }
  253.                                 }
  254.                                 while ($Host.UI.RawUI.KeyAvailable)
  255.                                 {
  256.                                         if ($Host.ui.RawUI.ReadKey('NoEcho,IncludeKeyDown,IncludeKeyUp').VirtualKeyCode -eq 27 -and !$ExitForced)
  257.                                         {
  258.                                                 $Queue.Clear();
  259.                                                 Write-Warning 'Aborting job! Escape pressed! Inputs that have not been processed will be skipped.'
  260.                                                 $ExitForced = $true;
  261.                                                 #foreach ($Pipeline in @($Pipelines))
  262.                                                 #{
  263.                                                 #       $Pipeline.StopAsync()
  264.                                                 #}
  265.                                         }              
  266.                                 }
  267.                                 if ($Pipelines.Count) {Start-Sleep -Milliseconds 50}
  268.                         }
  269.  
  270.                         #Clear the Progress bar so other apps don't have to keep seeing it.
  271.                         Write-Progress -Completed -Activity "`0" -Status "`0"
  272.  
  273.                         # Since reference to Dispose was removed.  I added this to try to help with releasing resources as possible.
  274.                         # This might be a bad idea, but I'm leaving it in for now. (Stephen Mills)
  275.                         [GC]::Collect()
  276.                 }
  277.                 finally
  278.                 {
  279.                         foreach ($Pipeline in @($Pipelines))
  280.                         {
  281.                                 if ( -not $Pipeline.Output.EndOfPipeline -or -not $Pipeline.Error.EndOfPipeline)
  282.                                 {
  283.                                         Write-Warning 'Pipeline still runinng.  Stopping Async.'
  284.                                         $Pipeline.StopAsync()
  285.                                         Remove-Pipeline $Pipeline
  286.                                 }
  287.                         }
  288.                 }
  289.         }
  290. }

Submit a correction or amendment below (
click here to make a fresh posting)
After submitting an amendment, you'll be able to view the differences between the old and new posts easily.

Syntax highlighting:


Remember me