How to Simplify Data Flow By Multi-Table Synchronization With Apache SeaTunnel

cover
23 Jul 2024

In addition to data synchronization between single tables, Apache SeaTunnel also supports synchronization from a single table to multiple tables, from multiple tables to a single table, and from multiple tables to multiple tables. Below is a brief example of how to implement these functionalities.

Single Table to Single Table

For one source, one sink situation.

Synchronizing from MySQL to MySQL without any distinction in between.

env {
  # You can set flink configuration here
  execution.parallelism = 2
  job.mode = "BATCH"
}
source{
    Jdbc {
        url = "jdbc:mysql://127.0.0.1:3306/test"
        driver = "com.mysql.cj.jdbc.Driver"
        connection_check_timeout_sec = 100
        user = "user"
        password = "password"
        query = "select * from base_region"
    }
}
 
transform {
    # If you would like to get more information about how to configure seatunnel and see full list of transform plugins,
    # please go to https://seatunnel.apache.org/docs/transform/sql
}
 
sink {
  jdbc {
        url = "jdbc:mysql://127.0.0.1:3306/dw"
        driver = "com.mysql.cj.jdbc.Driver"
        connection_check_timeout_sec = 100
        user = "user"
        password = "password"
    query = "insert into base_region(id,region_name) values(?,?)"
  }
}

Execute the task:


./bin/seatunnel.sh --config ./config/mysql2mysql_batch.confSingle Table to Multiple TablesSingle Table to Multiple Tables

Single Table to Multiple Tables

For one source, and multiple sinks.

Synchronizing from MySQL to MySQL, where data from a single user table is synchronized. Two SQL components are used in between to separate male and female users, and during the sink phase, they are inserted into different tables:

env {
  execution.parallelism = 2
  job.mode = "BATCH"
}
source {
    Jdbc {
        url = "jdbc:mysql://127.0.0.1:3306/test"
        driver = "com.mysql.cj.jdbc.Driver"
        connection_check_timeout_sec = 100
        user = "user"
        password = "password"
        result_table_name="t_user"
        query = "select * from t_user;"
    }
}
 
transform {
  Sql {
    source_table_name = "t_user"
    result_table_name = "t_user_nan"
    query = "select id,name,birth,gender from t_user where gender ='男';"
  }
  Sql {
    source_table_name = "t_user"
    result_table_name = "t_user_nv"
    query = "select id,name,birth,gender from t_user where gender ='女';"
  }
}
 
sink {
  jdbc {
    url = "jdbc:mysql://127.0.0.1:3306/dw"
    driver = "com.mysql.cj.jdbc.Driver"
    connection_check_timeout_sec = 100
    user = "user"
    password = "password"
    source_table_name = "t_user_nan"
    query =  "insert into t_user_nan(id,name,birth,gender) values(?,?,?,?)"
  }
  jdbc {
    url = "jdbc:mysql://127.0.0.1:3306/dw"
    driver = "com.mysql.cj.jdbc.Driver"
    connection_check_timeout_sec = 100
    user = "user"
    password = "password"
    source_table_name = "t_user_nv"
    query =  "insert into t_user_nv(id,name,birth,gender) values(?,?,?,?)"
  }
}
./bin/seatunnel.sh --config ./config/mysql2mysql_1n.conf

Multiple Tables to Single Table

For multiple sources, and one sink.

Assuming there is a switch usage table and a router usage table, the target table is an OLAP table that combines this data.

The table structure is as follows:

-- dw Source table 1
CREATE TABLE IF NOT EXISTS ads_device_switch_performance (
  `event_time` timestamp COMMENT 'Business Time',
  `device_id` VARCHAR(32) COMMENT 'Equipment id',
  `device_type` VARCHAR(32) COMMENT 'Equipment Type',
  `device_name` VARCHAR(128) COMMENT 'Equipment Name',
  `cpu_usage` INT COMMENT 'CPU Usage Rate'
) ;
 
INSERT INTO `ads_device_switch_performance` VALUES ('2024-01-15 14:25:11', '2001', '2', 'Exchanger 1', 49);
INSERT INTO `ads_device_switch_performance` VALUES ('2024-01-17 22:25:40', '2002', '1', 'Exchanger 2', 65);
 
-- dw Source table 2
CREATE TABLE IF NOT EXISTS ads_device_router_performance (
  `event_time` timestamp COMMENT 'Business Time',
  `device_id` VARCHAR(32) COMMENT 'Equipment id',
  `device_type` VARCHAR(32) COMMENT 'Equipment Type',
  `device_name` VARCHAR(128) COMMENT 'Equipment Name',
  `cpu_usage` INT COMMENT 'CPU Usage Rate'
);
 
INSERT INTO `ads_device_router_performance` VALUES ('2024-01-17 21:23:22', '1001', '1', 'Router 1', 35);
INSERT INTO `ads_device_router_performance` VALUES ('2024-01-16 17:23:53', '1002', '2', 'Router 2', 46);
 
 
-------------------------------------------------------------------------------
-- olap Target table
CREATE TABLE `device_performance` (
  `id` INT NOT NULL AUTO_INCREMENT COMMENT 'Table Primary Key',
  `event_time` VARCHAR(32) NOT NULL COMMENT 'Business Time',
  `device_id` VARCHAR(32) COMMENT 'Equipment id',
  `device_type` VARCHAR(32) COMMENT 'Equipment Type',
  `device_name` VARCHAR(128) NOT NULL COMMENT 'Equipment Name',
  `cpu_usage` FLOAT NOT NULL COMMENT 'CPU Usage Rate Unit %',
  `create_time` DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'Create time',
  `update_time` DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'Update time',
  PRIMARY KEY (`id`)
) COMMENT='Equipment status';

Execute the task:

./bin/seatunnel.sh --config ./syn_job/mysql2mysql_n1_batch.conf

It works!

Multi-Table to Multi-Table

For multiple sources, and multiple sinks.

Synchronize switch usage data and router usage data to the corresponding destination table, and process them with intermediate SQL components.

env {
    job.mode="BATCH"
    job.name="device_performance"
}
 
source {
    Jdbc {
        url="jdbc:mysql://127.0.0.1:3306/dw?allowMultiQueries=true&characterEncoding=utf-8"
        driver="com.mysql.cj.jdbc.Driver"
        user = "user"
        password = "password"
        result_table_name="switch_src"
        query="SELECT `event_time`, `device_id`, `device_type`, `device_name`, `cpu_usage` FROM ads_device_switch_performance;"
    }
    Jdbc {
        url="jdbc:mysql://127.0.0.1:3306/dw?allowMultiQueries=true&characterEncoding=utf-8"
        driver="com.mysql.cj.jdbc.Driver"
        user = "user"
        password = "password"
        result_table_name="router_src"
        query="SELECT `event_time`, `device_id`, `device_type`, `device_name`, `cpu_usage` FROM ads_device_router_performance;"
    }
}
 
transform {
  Sql {
    source_table_name = "switch_src"
    result_table_name = "switch_dst"
    query = "SELECT  event_time , device_id, device_type, device_name, cpu_usage, NOW() AS create_time, NOW() AS update_time  FROM switch_src;"
  }
  Sql {
    source_table_name = "router_src"
    result_table_name = "router_dst"
    query = "SELECT event_time, device_id, device_type, device_name, cpu_usage, NOW() AS create_time, NOW() AS update_time FROM router_src;"
  }
}
 
sink {
    Jdbc {
        url="jdbc:mysql://127.0.0.1:3306/olap?allowMultiQueries=true&characterEncoding=utf-8"
        driver="com.mysql.cj.jdbc.Driver"
        user = "user"
        password = "password"
        source_table_name = "switch_dst"
        query="INSERT INTO device_performance_switch  VALUES(null,?, ?, ?, ?, ?, ?, ?) ;"
      }
    Jdbc {
        url="jdbc:mysql://127.0.0.1:3306/olap?allowMultiQueries=true&characterEncoding=utf-8"
        driver="com.mysql.cj.jdbc.Driver"
        user = "user"
        password = "password"
        source_table_name = "router_dst"
        query="INSERT INTO device_performance_router  VALUES(null,?, ?, ?, ?, ?, ?, ?) ;"
       }
}

Conclusion

In summary, Apache SeaTunnel multi-table synchronization technology is efficient, real-time, reliable, and flexible, and plays an important role in enterprise data synchronization. With the help of Apache SeaTunnel’s multi-table synchronization function, enterprises can better realize the seamless flow of data between different systems and databases, improve the efficiency of data management and utilization, and provide strong support for business development.

We hope this article can help readers better understand and apply Apache SeaTunnel multi-table synchronization to bring more possibilities for enterprise data synchronization.