Execute Stored Procedures in Parallel

By | September 6, 2010

After my post here, It seems that everything is working fine but I need performance out of it an my last resort is to Execute Stored Procedures in Parallel so I can run multiple instances in one run.  To achieve that I need to create a CLR Stored Procedure so I can run Execute commands in a thread.  So what do I need to achieve that?

    1. You need SQL 2008 or later and enable CLR
    2. You also need to set the Database Trustworthy Flag to On

To do that use these commands:


sp_configure 'clr enabled', 1
GO
RECONFIGURE
GO

ALTER DATABASE [YourDatabase] SET TRUSTWORTHY ON
GO
  1. You also need an Account to establish new connections based on Integrated Security
  2. You need Visual Studio to develop the CLR Stored Procedure whit .NET Framework 2.0 or later installed

So what exactly is a CLR Stored Procedure? According to Microsoft

CLR Stored procedures are routines that cannot be used in scalar expressions. Unlike scalar functions, they can return tabular results and messages to the client, invoke data definition language (DDL) and data manipulation language (DML) statements, and return output parameters.
In SQL Server, you can develop a database objects inside an instance of SQL Server that is programmed in an assembly created in the Microsoft .NET Framework common language runtime (CLR). Database objects that can leverage the rich programming model provided by the CLR include triggers, stored procedures, functions, aggregate functions, and types.Creating a CLR stored procedure in SQL Server involves the following steps:

  • Define the stored procedure as a static method of a class in a language supported by the .NET Framework. For more information about how to program CLR stored procedures, see CLR Stored Procedures. Then, compile the class to build an assembly in the .NET Framework by using the appropriate language compiler.
  • Register the assembly in SQL Server by using the CREATE ASSEMBLY statement. For more information about how to work with assemblies in SQL Server, see Assemblies.
  • Create the stored procedure that references the registered assembly by using the CREATE PROCEDURE statement.

Now having said that lets start creating our CLR Stored Procedure, fire up the Visual Studio and choose a Database/SQL Server Project

New Database Project

Once it is created it will ask you for the Database and choose the database you want run the CLR Stored Procedure from.  Then the coding begins:

First you need to add a new item which is a stored procedure

Stored Procedure Item

As you might have noticed you can also add an Aggregate, User Defined Function, Triggers and User Defined Types.  Once you added the Stored Procedure then the fun begins.  Here is what I have done.

using System;
using System.Data;
using System.Data.SqlClient;
using System.Data.SqlTypes;
using System.Threading;
using Microsoft.SqlServer.Server;
using System.Collections;
using System.Collections.Generic;
using System.Collections.Specialized;
using SampleConsole;

namespace Parallel_Execution
{
public partial class StoredProcedures
{
[Microsoft.SqlServer.Server.SqlProcedure]
public static SqlInt32 spExecuteParallel(string DB, int MaxDOP, string TSQL, int msDelay, int Retries)
{
    // Initialize Variables
    SqlConnection oConn = new SqlConnection();
    SqlCommand oCmd = new SqlCommand();
    List<string> oErrorString = new List<string>();
    object oLocker = new object();
    string sServer = null;

    List<Thread> oThread = new List<Thread>();
    StringCollection sStopped = new StringCollection();

    // Get Server Instance Name
    oConn = new SqlConnection("context connection = true;");
    oConn.Open();

    oCmd = oConn.CreateCommand();
    oCmd.CommandText = "SELECT @@SERVERNAME";
    sServer = oCmd.ExecuteScalar().ToString();

    oCmd.Dispose();
    oConn.Close();
    oConn.Dispose();

    // Execute Threads
    int iCurrentThread = 0;
    while (iCurrentThread < MaxDOP)
    {
        ExecuteSQL Executer = new ExecuteSQL(sServer, DB, TSQL.Replace("?", DB.ToString().Trim()), Retries, ref oErrorString, ref oLocker);

        Thread oItem = new Thread(Executer.Process);
        oItem.Name = "ExecuteSQL " + DB.ToString().Trim();
        oItem.Start();
        oThread.Add(oItem);

        SqlContext.Pipe.Send(DateTime.Now.ToLongTimeString() + " : Start : " + oItem.Name.Replace("ExecuteSQL """));
        Thread.Sleep(msDelay);

        while (RunningThreads(ref oThread, ref sStopped) >= MaxDOP)
        {
            Thread.Sleep(1000);
        }
        iCurrentThread++;
    }

    // Wait for all Threads to Stop
    while (RunningThreads(ref oThread, ref sStopped) > 0)
    {
        Thread.Sleep(1000);
    }
    SqlContext.Pipe.Send("All Thread have Stopped with " + oErrorString.Count.ToString() + " Error/s ");

    if (oErrorString.Count > 0)
    {
        foreach (string sIndividualErrors in oErrorString)
        {
            SqlContext.Pipe.Send(sIndividualErrors.ToString());
        }

        throw new Exception("Error Occurred.");
    }

    return 0 - oErrorString.Count;
}

public static int RunningThreads(ref List<Thread> oThread, ref StringCollection oStops)
{
    int iRunningCount = 0;

    foreach (Thread oIndividualThread in oThread)
    {
        if (oIndividualThread.IsAlive)
        {
            iRunningCount += 1;
        }
        else if (!oStops.Contains(oIndividualThread.Name))
        {
            oStops.Add(oIndividualThread.Name);
            SqlContext.Pipe.Send(DateTime.Now.ToLongTimeString() + " : Stop  : " + oIndividualThread.Name.Replace("ExecuteSQL """));
        }
    }
    return iRunningCount;
}
}
}

Now you might have noticed that I have a class called ExecuteSQL, that class is the actual class that performs the execute, the code above just takes care of the threads.  Here is the code for ExecuteSQL:

using System.Data.SqlClient;
using System.Threading;
using System.Data;
using System;
using System.Collections.Generic;
namespace Parallel_Execution
{
class ExecuteSQL
{
private List<string> oExecuteErrors;
private object oExecuteLocker;
private string sExecuteServer;
private string sExecuteDB;
private string sExecuteTSQL;
private int iExecuteRetries;

public ExecuteSQL(string sServer, string sDB, string sTSQL, int iRetries, ref List<string> oErrors, ref object oLocker)
{
    this.sExecuteServer = sServer;
    this.sExecuteDB = sDB;
    this.sExecuteTSQL = sTSQL;
    this.iExecuteRetries = iRetries;
    this.oExecuteErrors = oErrors;
    this.oExecuteLocker = oLocker;
}

public void Process()
{
    int iTries = 1;
    SqlConnection oConn = new SqlConnection();

Retry:
    oConn = new SqlConnection("Data Source=" + sExecuteServer + ";Initial Catalog=" + sExecuteDB + ";Integrated Security=SSPI;");
    try
    {
        oConn.Open();

        if (oConn.State == ConnectionState.Open)
        {
            SqlCommand oCmd = oConn.CreateCommand();
            oCmd.CommandText = sExecuteTSQL;
            oCmd.CommandTimeout = 0;
            oCmd.ExecuteNonQuery();

            oCmd.Dispose();
            oConn.Close();
            oConn.Dispose();
        }
        else
        {
            throw new Exception("SQL Server not Found or Unable to Connect to SQL Server");
        }
    }
    catch (Exception ex)
    {
        if (oConn.State != ConnectionState.Closed) oConn.Close();
        oConn.Dispose();

        if (iTries <= iExecuteRetries)
        {
            Thread.Sleep(5000);
            iTries += 1;
            goto Retry;
        }
        else
        {
            lock (oExecuteLocker)
            {
                char cSpace = char.Parse(" ");
                oExecuteErrors.Add(this.sExecuteDB.PadRight(16, cSpace) + " : " + ex.Message);
            }
        }
    }
}
}
}

Once all done you can now deploy you CLR Stored Procedure, and once you had deployed it it will be now available in your chosen database in the same place you find your stored procedures.

The deployed CLR Stored Procedure

Once it is there you can now use it to execute stored procedures in Parallel, in this case I had created a table so that I can insert sample data with Date.  To use it you have the following parameters

  • DB – Your Database
  • MaxDOP – The number of Threads you want to use
  • TSQL – The TSQL you want to execute
  • msDelay – Delay before you run Next Thread
  • Retries – Retry Count if you encounter errors

To show you a sample onhow I use it

USE [SampleDB]
GO

DECLARE    @return_value int

EXEC    @return_value = [dbo].[spExecuteParallel]
 @DB = N'SampleDB',
 @MaxDOP = 8,
 @TSQL = N'Insert into TestTable ([Message], LogDate) values (''Test'', GetDate())',
 @msDelay = 0,
 @Retries = 1

SELECT    'Return Value' = @return_value
GO

After you run this check your message and table for the results!

Here are my results

Results

Table Results


21 thoughts on “Execute Stored Procedures in Parallel

  1. jon

    Great post, done everyting as describet but I get this…

    A .NET Framework error occurred during execution of user-defined routine or aggregate “spExecuteParallel”:
    System.Security.HostProtectionException: Attempted to perform an operation that was forbidden by the CLR host.

    The protected resources (only available with full trust) were: All
    The demanded resources were: ExternalThreading

    System.Security.HostProtectionException:
    at ParallelExecution.ParallelExecution.StoredProcedures.spExecuteParallel(String DB, Int32 MaxDOP, String TSQL, Int32 msDelay, Int32 Retries)

    Any suggestions?

    Reply
  2. rsmacaalay

    Can you set the permission level of the project property to “unsafe”, the re deploy it. Then let me know what happens.

    Reply
    1. jon

      Sorry, my mistake-forgot about that. Thanks again for the post.

      Reply
  3. belial

    im getting this error when setting the assembly permission level to unsafe (from visual studio)…

    Error 1 CREATE ASSEMBLY for assembly ‘SqlServerProject1′ failed because assembly ‘SqlServerProject1′ is not authorized for PERMISSION_SET = UNSAFE. The assembly is authorized when either of the following is true: the database owner (DBO) has UNSAFE ASSEMBLY permission and the database has the TRUSTWORTHY database property on; or the assembly is signed with a certificate or an asymmetric key that has a corresponding login with UNSAFE ASSEMBLY permission. SqlServerProject1

    Reply
    1. rsmacaalay

      Try this. Check if your Datanase Trustworthy is set to true by right clicking on the Database and go to properties then choose options

      Trustworthy Settings

      If it is then set it to false by running this T-SQL
      ALTER DATABASE {YourDatabaseName}
      SET TRUSTWORTHY ON
      GO

      Reply
  4. belial

    now i cant see the stored running but im getting this errors…
    many thanks for posting back…

    4:02:01 PM : Start : InternalSiteDB_Dev
    4:02:01 PM : Start : InternalSiteDB_Dev
    4:02:01 PM : Start : InternalSiteDB_Dev
    4:02:01 PM : Start : InternalSiteDB_Dev
    4:02:01 PM : Start : InternalSiteDB_Dev
    4:02:01 PM : Start : InternalSiteDB_Dev
    4:02:01 PM : Start : InternalSiteDB_Dev
    4:02:01 PM : Start : InternalSiteDB_Dev
    4:02:21 PM : Stop : InternalSiteDB_Dev
    All Thread have Stopped with 8 Error/s
    InternalSiteDB_Dev : A network-related or instance-specific error occurred while establishing a connection to SQL Server. The server was not found or was not accessible. Verify that the instance name is correct and that SQL Server is configured to allow remote connections. (provider: SQL Network Interfaces, error: 26 – Error Locating Server/Instance Specified)
    InternalSiteDB_Dev : A network-related or instance-specific error occurred while establishing a connection to SQL Server. The server was not found or was not accessible. Verify that the instance name is correct and that SQL Server is configured to allow remote connections. (provider: SQL Network Interfaces, error: 26 – Error Locating Server/Instance Specified)
    InternalSiteDB_Dev : A network-related or instance-specific error occurred while establishing a connection to SQL Server. The server was not found or was not accessible. Verify that the instance name is correct and that SQL Server is configured to allow remote connections. (provider: SQL Network Interfaces, error: 26 – Error Locating Server/Instance Specified)
    InternalSiteDB_Dev : A network-related or instance-specific error occurred while establishing a connection to SQL Server. The server was not found or was not accessible. Verify that the instance name is correct and that SQL Server is configured to allow remote connections. (provider: SQL Network Interfaces, error: 26 – Error Locating Server/Instance Specified)
    InternalSiteDB_Dev : A network-related or instance-specific error occurred while establishing a connection to SQL Server. The server was not found or was not accessible. Verify that the instance name is correct and that SQL Server is configured to allow remote connections. (provider: SQL Network Interfaces, error: 26 – Error Locating Server/Instance Specified)
    InternalSiteDB_Dev : A network-related or instance-specific error occurred while establishing a connection to SQL Server. The server was not found or was not accessible. Verify that the instance name is correct and that SQL Server is configured to allow remote connections. (provider: SQL Network Interfaces, error: 26 – Error Locating Server/Instance Specified)
    InternalSiteDB_Dev : A network-related or instance-specific error occurred while establishing a connection to SQL Server. The server was not found or was not accessible. Verify that the instance name is correct and that SQL Server is configured to allow remote connections. (provider: SQL Network Interfaces, error: 26 – Error Locating Server/Instance Specified)
    InternalSiteDB_Dev : A network-related or instance-specific error occurred while establishing a connection to SQL Server. The server was not found or was not accessible. Verify that the instance name is correct and that SQL Server is configured to allow remote connections. (provider: SQL Network Interfaces, error: 26 – Error Locating Server/Instance Specified)
    Msg 6522, Level 16, State 1, Procedure spExecuteParallel, Line 0
    A .NET Framework error occurred during execution of user-defined routine or aggregate “spExecuteParallel”:
    System.Exception: Error Occurred.
    System.Exception:
    at StoredProcedures.spExecuteParallel(String DB, Int32 MaxDOP, String TSQL, Int32 msDelay, Int32 Retries)
    .

    (1 row(s) affected)

    Reply
  5. belial

    oCmd.CommandText = “SELECT @@SERVERNAME”;

    IS RETURNING THE WRONG NAME FOR THE SERVER IN SOME INSTALLATIONS.

    “SERVERNAMEMSSQLSERVER_2008″ DOES NOT WORK BUT CONNECTING TO “SERVERNAME” WORKS

    Reply
  6. belial

    i just changed the server name manually…

    oCmd = oConn.CreateCommand();
    oCmd.CommandText = “SELECT @@SERVERNAME”;
    sServer = “RealServerNameWithNoInstanceName”; //oCmd.ExecuteScalar().ToString()

    Reply
  7. Michael Brönnimann

    Parallelization for queries (SELECT) is covered quite well by the SQL engine itself, but when it comes to large volume data modifications (UPDATE, INSERT, DELETE), the standard engine does parallelize towards best use of all available resources (disk, multiple cpu-cores, etc.).

    Therefore you may have a look into the approach of SQL Parallel Boost

    This approach can also be used to execute multiple SQL statements in parallel.

    A purely SQL engine related parallelisation solution takes advantage of minimized complexity and has no ‘external’ components like SSIS involved, Furthermore it’s the best performing solution regarding task splitting and synchronization, as it hasn’t potential connection and communication overhead. The overall performance gain thru parallelisation with SQL Parallel Boost is up to 10 !

    In case you don’t wan’t to rebuild your own solution, SQL Parallel Boost provides a self-contained pure T-SQL based solution, which can be easily embedded in existing applications and ETL process tasks.

    Reply
      1. arsinx

        thank you sir . But i can’t get a result set when using select statement . can you give me a sample code that returns a result set and how they are implemented in vb.net?

        thank you in advance sir!
        your reply will be much appreciated!

        Reply
      2. arsinx

        By the way sir, can you do it in windows form rather than c#? if its ok, but if its time consuming anything will do.

        thank you!

        Reply
  8. arsinx

    thank you sir . But i can’t get a result set when using select statement . can you give me a sample code that returns a result set and how they are implemented in vb.net?

    thank you in advance sir!
    your reply will be much appreciated!

    Reply
      1. arsinx

        __
        I’m a bit confuse about the user define Function checkThreads() and “If errors.Count > 0 Then …”. I am just using one database, how can i change the flow of the code that only uses one database and return a result set ?

        Here the code:

        Imports System
        Imports System.Data
        Imports System.Data.SqlClient
        Imports System.Data.SqlTypes
        Imports System.Threading
        Imports Microsoft.SqlServer.Server

        Partial Public Class StoredProcedures

        _
        Public Shared Function EXEC_PARALLEL(ByVal MaxDOP As Integer, _
        ByVal TSQL As String, ByVal msDelay As Integer, _
        ByVal Retries As Integer) As SqlInt32

        ‘ Initialize variables

        Dim errors As New Generic.List(Of String)
        Dim locker As New Object

        Dim DB As String = “inventory”

        Dim threads As New Collections.Generic.List(Of Thread)
        Dim stopped As New Specialized.StringCollection
        Dim item As Thread
        Dim executer As ExecSql

        ‘ Get Server Instance Name
        Dim conn As SqlConnection
        Dim cmd As SqlCommand
        Dim server As String
        conn = New SqlConnection(“context connection = true;”)
        conn.Open()

        cmd = conn.CreateCommand()
        cmd.CommandText = “SELECT @@SERVERNAME”
        server = cmd.ExecuteScalar().ToString()

        cmd.Dispose()
        conn.Close()
        conn.Dispose()

        ‘ Execute threads
        executer = New ExecSql(server, DB, TSQL.Replace(“?”, DB), Retries, errors, locker)
        item = New Thread(AddressOf executer.run)
        item.Name = “ExecSql ” & DB
        item.Start()
        threads.Add(item)

        SqlContext.Pipe.Send(DateTime.Now.ToShortTimeString() & ” : Start : ” & item.Name.Replace(“ExecSql “, “”))
        Thread.Sleep(msDelay)

        ‘While checkThreads(threads, stopped) >= MaxDOP
        ‘ Thread.Sleep(1000)
        ‘End While

        ” Wait for all threads to stop
        ‘While checkThreads(threads, stopped) > 0
        ‘ Thread.Sleep(1000)
        ‘End While
        SqlContext.Pipe.Send(“All threads have stopped with ” & errors.Count.ToString() & ” errors: “)

        If errors.Count > 0 Then
        keyloop = errors.GetEnumerator()
        While keyloop.MoveNext()
        SqlContext.Pipe.Send(keyloop.Current().ToString)
        End While

        Throw New Exception(“ERRORS OCCURRED.”)
        End If

        Return 0 – errors.Count
        End Function

        Public Shared Function checkThreads(ByRef threads As Collections.Generic.List(Of Thread), _
        ByRef stops As Specialized.StringCollection) As Integer
        Dim item As Thread
        Dim running As Integer = 0

        Dim keyloop As IEnumerator = threads.GetEnumerator()
        While keyloop.MoveNext()
        item = CType(keyloop.Current(), Thread)
        If item.IsAlive Then
        running += 1
        ElseIf Not stops.Contains(item.Name) Then
        stops.Add(item.Name)
        SqlContext.Pipe.Send(DateTime.Now.ToShortTimeString() & ” : Stop : ” & item.Name.Replace(“ExecSql “, “”))
        End If
        End While

        Return running
        End Function
        End Class

        Public Class ExecSql
        Private ExecErrors As Generic.List(Of String)
        Private ExecLocker As Object
        Private ExecServer As String
        Private ExecDB As String
        Private ExecTSQL As String
        Private ExecRetries As Integer

        Public Sub New(ByVal Server As String, ByVal DB As String, ByVal TSQL As String, ByVal retries As Integer, _
        ByRef errors As Generic.List(Of String), ByRef locker As Object)
        Me.ExecServer = Server
        Me.ExecDB = DB
        Me.ExecTSQL = TSQL
        Me.ExecRetries = retries
        Me.ExecErrors = errors
        Me.ExecLocker = locker
        End Sub

        Public Sub run()
        Dim tries As Integer = 1
        Dim conn As SqlConnection
        Dim reader As SqlDataReader
        Retry:
        conn = New SqlConnection(“Data Source=” & ExecServer & “;Initial Catalog=” & ExecDB & “;Integrated Security=SSPI;”)
        Try
        conn.Open()

        If conn.State = ConnectionState.Open Then
        Dim cmd As SqlCommand = conn.CreateCommand()
        cmd.CommandText = ExecTSQL
        cmd.CommandTimeout = 0
        reader = cmd.ExecuteReader()

        cmd.Dispose()
        conn.Close()
        conn.Dispose()
        Else
        Throw New Exception(“Unable to connect to SQL Server.”)
        End If
        Catch e As Exception
        If conn.State ConnectionState.Closed Then conn.Close()
        conn.Dispose()
        ‘System.Diagnostics.EventLog.WriteEntry(“SQL_EXEC_PARALLEL”, e.Message)
        If tries <= ExecRetries Then
        Thread.Sleep(5000)
        tries += 1
        GoTo Retry
        Else
        SyncLock ExecLocker
        ExecErrors.Add(Me.ExecDB.PadRight(16, CChar(" ")) + " : " + e.Message)
        End SyncLock
        End If
        End Try
        End Sub

        End Class

        Reply
  9. arsinx

    I’m a bit confuse about the user define Function checkThreads() and “If errors.Count > 0 Then …”. I am just using one database, how can i change the flow of the code that only uses one database and return a result set ?

    Here the code:

    Imports System
    Imports System.Data
    Imports System.Data.SqlClient
    Imports System.Data.SqlTypes
    Imports System.Threading
    Imports Microsoft.SqlServer.Server

    Partial Public Class StoredProcedures

    _
    Public Shared Function EXEC_PARALLEL(ByVal MaxDOP As Integer, _
    ByVal TSQL As String, ByVal msDelay As Integer, _
    ByVal Retries As Integer) As SqlInt32

    ‘ Initialize variables

    Dim errors As New Generic.List(Of String)
    Dim locker As New Object

    Dim DB As String = “inventory”

    Dim threads As New Collections.Generic.List(Of Thread)
    Dim stopped As New Specialized.StringCollection
    Dim item As Thread
    Dim executer As ExecSql

    ‘ Get Server Instance Name
    Dim conn As SqlConnection
    Dim cmd As SqlCommand
    Dim server As String
    conn = New SqlConnection(“context connection = true;”)
    conn.Open()

    cmd = conn.CreateCommand()
    cmd.CommandText = “SELECT @@SERVERNAME”
    server = cmd.ExecuteScalar().ToString()

    cmd.Dispose()
    conn.Close()
    conn.Dispose()

    ‘ Execute threads
    executer = New ExecSql(server, DB, TSQL.Replace(“?”, DB), Retries, errors, locker)
    item = New Thread(AddressOf executer.run)
    item.Name = “ExecSql ” & DB
    item.Start()
    threads.Add(item)

    SqlContext.Pipe.Send(DateTime.Now.ToShortTimeString() & ” : Start : ” & item.Name.Replace(“ExecSql “, “”))
    Thread.Sleep(msDelay)

    ‘While checkThreads(threads, stopped) >= MaxDOP
    ‘ Thread.Sleep(1000)
    ‘End While

    ” Wait for all threads to stop
    ‘While checkThreads(threads, stopped) > 0
    ‘ Thread.Sleep(1000)
    ‘End While
    SqlContext.Pipe.Send(“All threads have stopped with ” & errors.Count.ToString() & ” errors: “)

    If errors.Count > 0 Then
    keyloop = errors.GetEnumerator()
    While keyloop.MoveNext()
    SqlContext.Pipe.Send(keyloop.Current().ToString)
    End While

    Throw New Exception(“ERRORS OCCURRED.”)
    End If

    Return 0 – errors.Count
    End Function

    Public Shared Function checkThreads(ByRef threads As Collections.Generic.List(Of Thread), _
    ByRef stops As Specialized.StringCollection) As Integer
    Dim item As Thread
    Dim running As Integer = 0

    Dim keyloop As IEnumerator = threads.GetEnumerator()
    While keyloop.MoveNext()
    item = CType(keyloop.Current(), Thread)
    If item.IsAlive Then
    running += 1
    ElseIf Not stops.Contains(item.Name) Then
    stops.Add(item.Name)
    SqlContext.Pipe.Send(DateTime.Now.ToShortTimeString() & ” : Stop : ” & item.Name.Replace(“ExecSql “, “”))
    End If
    End While

    Return running
    End Function
    End Class

    Public Class ExecSql
    Private ExecErrors As Generic.List(Of String)
    Private ExecLocker As Object
    Private ExecServer As String
    Private ExecDB As String
    Private ExecTSQL As String
    Private ExecRetries As Integer

    Public Sub New(ByVal Server As String, ByVal DB As String, ByVal TSQL As String, ByVal retries As Integer, _
    ByRef errors As Generic.List(Of String), ByRef locker As Object)
    Me.ExecServer = Server
    Me.ExecDB = DB
    Me.ExecTSQL = TSQL
    Me.ExecRetries = retries
    Me.ExecErrors = errors
    Me.ExecLocker = locker
    End Sub

    Public Sub run()
    Dim tries As Integer = 1
    Dim conn As SqlConnection
    Dim reader As SqlDataReader
    Retry:
    conn = New SqlConnection(“Data Source=” & ExecServer & “;Initial Catalog=” & ExecDB & “;Integrated Security=SSPI;”)
    Try
    conn.Open()

    If conn.State = ConnectionState.Open Then
    Dim cmd As SqlCommand = conn.CreateCommand()
    cmd.CommandText = ExecTSQL
    cmd.CommandTimeout = 0
    reader = cmd.ExecuteReader()

    cmd.Dispose()
    conn.Close()
    conn.Dispose()
    Else
    Throw New Exception(“Unable to connect to SQL Server.”)
    End If
    Catch e As Exception
    If conn.State ConnectionState.Closed Then conn.Close()
    conn.Dispose()
    ‘System.Diagnostics.EventLog.WriteEntry(“SQL_EXEC_PARALLEL”, e.Message)
    If tries <= ExecRetries Then
    Thread.Sleep(5000)
    tries += 1
    GoTo Retry
    Else
    SyncLock ExecLocker
    ExecErrors.Add(Me.ExecDB.PadRight(16, CChar(" ")) + " : " + e.Message)
    End SyncLock
    End If
    End Try
    End Sub

    End Class

    Reply
      1. arsinx

        the code originally uses multiple database, i’m concerned about “Public Shared Function checkThreads” because i think it is design to handle multiple database and also the “If errors.Count” statement . It slows down the execution of query even though only small data is being queried. About the Execute SQL class are you referring to Class ExecSql? I’ve already included in the above code .

        Reply
        1. rsmacaalay

          I think you are missing this on your code ” While iCurrentThread < MaxDOP
          For VB Implementation please refer to this

          Imports System.Data
          Imports System.Data.SqlClient
          Imports System.Data.SqlTypes
          Imports System.Threading
          Imports Microsoft.SqlServer.Server
          Imports System.Collections
          Imports System.Collections.Generic
          Imports System.Collections.Specialized
          Imports SampleConsole
           
          Namespace Parallel_Execution
              Partial Public Class StoredProcedures
                  <Microsoft.SqlServer.Server.SqlProcedure()> _
                  Public Shared Function spExecuteParallel(ByVal DB As StringByVal MaxDOP As IntegerByVal TSQL As StringByVal msDelay As IntegerByVal Retries As IntegerAs SqlInt32
                      ' Initialize Variables
                      Dim oConn As New SqlConnection()
                      Dim oCmd As New SqlCommand()
                      Dim oErrorString As New List(Of String)()
                      Dim oLocker As New Object()
                      Dim sServer As String = Nothing
           
                      Dim oThread As New List(Of Thread)()
                      Dim sStopped As New StringCollection()
           
                      ' Get Server Instance Name
                      oConn = New SqlConnection("context connection = true;")
                      oConn.Open()
           
                      oCmd = oConn.CreateCommand()
                      oCmd.CommandText = "SELECT @@SERVERNAME"
                      sServer = oCmd.ExecuteScalar().ToString()
           
                      oCmd.Dispose()
                      oConn.Close()
                      oConn.Dispose()
           
                      ' Execute Threads
                      Dim iCurrentThread As Integer = 0
                      While iCurrentThread < MaxDOP
                          Dim Executer As New ExecuteSQL(sServer, DB, TSQL.Replace("?", DB.ToString().Trim()), Retries, oErrorString, oLocker)
           
                          Dim oItem As New Thread(Executer.Process)
                          oItem.Name = "ExecuteSQL " & DB.ToString().Trim()
                          oItem.Start()
                          oThread.Add(oItem)
           
                          SqlContext.Pipe.Send(DateTime.Now.ToLongTimeString() & " : Start : " & oItem.Name.Replace("ExecuteSQL """))
                          Thread.Sleep(msDelay)
           
                          While RunningThreads(oThread, sStopped) >= MaxDOP
                              Thread.Sleep(1000)
                          End While
                          iCurrentThread += 1
                      End While
           
                      ' Wait for all Threads to Stop
                      While RunningThreads(oThread, sStopped) > 0
                          Thread.Sleep(1000)
                      End While
                      SqlContext.Pipe.Send("All Thread have Stopped with " & oErrorString.Count.ToString() & " Error/s ")
           
                      If oErrorString.Count > 0 Then
                          For Each sIndividualErrors As String In oErrorString
                              SqlContext.Pipe.Send(sIndividualErrors.ToString())
                          Next
           
                          Throw New Exception("Error Occurred.")
                      End If
           
                      Return 0 - oErrorString.Count
                  End Function
           
                  Public Shared Function RunningThreads(ByRef oThread As List(Of Thread), ByRef oStops As StringCollection) As Integer
                      Dim iRunningCount As Integer = 0
           
                      For Each oIndividualThread As Thread In oThread
                          If oIndividualThread.IsAlive Then
                              iRunningCount += 1
                          ElseIf Not oStops.Contains(oIndividualThread.Name) Then
                              oStops.Add(oIndividualThread.Name)
                              SqlContext.Pipe.Send(DateTime.Now.ToLongTimeString() & " : Stop  : " & oIndividualThread.Name.Replace("ExecuteSQL """))
                          End If
                      Next
                      Return iRunningCount
                  End Function
              End Class
          End Namespace

          And for the Execute SQL Class

          Imports System.Data.SqlClient
          Imports System.Threading
          Imports System.Data
          Imports System.Collections.Generic
          Namespace Parallel_Execution
              Class ExecuteSQL
                  Private oExecuteErrors As List(Of String)
                  Private oExecuteLocker As Object
                  Private sExecuteServer As String
                  Private sExecuteDB As String
                  Private sExecuteTSQL As String
                  Private iExecuteRetries As Integer
           
                  Public Sub New(ByVal sServer As StringByVal sDB As StringByVal sTSQL As StringByVal iRetries As IntegerByRef oErrors As List(Of String), ByRef oLocker As Object)
                      Me.sExecuteServer = sServer
                      Me.sExecuteDB = sDB
                      Me.sExecuteTSQL = sTSQL
                      Me.iExecuteRetries = iRetries
                      Me.oExecuteErrors = oErrors
                      Me.oExecuteLocker = oLocker
                  End Sub
           
                  Public Sub Process()
                      Dim iTries As Integer = 1
                      Dim oConn As New SqlConnection()
          Retry:
           
                      oConn = New SqlConnection("Data Source=" & sExecuteServer & ";Initial Catalog=" & sExecuteDB & ";Integrated Security=SSPI;")
                      Try
                          oConn.Open()
           
                          If oConn.State = ConnectionState.Open Then
                              Dim oCmd As SqlCommand = oConn.CreateCommand()
                              oCmd.CommandText = sExecuteTSQL
                              oCmd.CommandTimeout = 0
                              oCmd.ExecuteNonQuery()
           
                              oCmd.Dispose()
                              oConn.Close()
                              oConn.Dispose()
                          Else
                              Throw New Exception("SQL Server not Found or Unable to Connect to SQL Server")
                          End If
                      Catch ex As Exception
                          If oConn.State <> ConnectionState.Closed Then
                              oConn.Close()
                          End If
                          oConn.Dispose()
           
                          If iTries <= iExecuteRetries Then
                              Thread.Sleep(5000)
                              iTries += 1
                              GoTo Retry
                          Else
                              SyncLock oExecuteLocker
                                  Dim cSpace As Char = Char.Parse(" ")
                                  oExecuteErrors.Add(Me.sExecuteDB.PadRight(16, cSpace) & " : " & ex.Message)
                              End SyncLock
                          End If
                      End Try
                  End Sub
              End Class
          End Namespace

           

          Reply

Leave a Reply