Start Newsfeeds Planet MySQL

PostHeaderIcon Newsfeeds

Planet MySQL
Planet MySQL - http://www.planetmysql.org/

  • TokuDB/PerconaFT fragmented data file performance improvments
    In this blog post, we’ll discuss how we’ve improved TokuDB and PerconaFT fragmented data file performance. Through our internal benchmarking and some user reports, we have found that with long term heavy write use TokuDB/PerconaFT performance can degrade significantly on large data files. Using smaller node sizes makes the problem worse (which is one of our performance tuning recommendations when you have faster storage). The problem manifests as low CPU utilization, a drop in overall TPS and high client response times during prolonged checkpointing. This post explains a little about how PerconaFT structures dictionary files and where the current implementation breaks down. Hopefully, it explains the nature of the issue, and how our solution helps addresses it. It also provides some contrived benchmarks that prove the solution. PerconaFT map file disk format NOTE. This post uses the terms index, data file, and dictionary are somewhat interchangeable. We will use the PerconaFT term “dictionary” to refer specifically to a PerconaFT key/value data file. PerconaFT stores every dictionary in its own data file on disk. TokuDB stores each index in a PerconaFT dictionary, plus one additional dictionary per table for some metadata. For example, if you have one TokuDB table with two secondary indices, you would have four data files or dictionaries: one small metadata dictionary for the table, one dictionary for the primary key/index, and one for each secondary index. Each dictionary file has three major parts: Two headers (yes, two) made up of various bits of metadata, file versions, a checkpoint logical sequence number (CLSN), the offset of this headers block translation table, etc… Two (yes, two, one per header) block translation tables (BTT) that maps block numbers (BNs) to the physical offsets and sizes of the data blocks within the file. Data blocks and holes (unused space). Unlike InnoDB, PerconaFT data blocks (nodes) are variable sizes and can be any size from a minimum of a few bytes for an empty internal node all the way up to the block size defined when the tree created (4MB by default if we don’t use compression) and anywhere in between, depending on the amount of data within that node. Each dictionary file contains two versions of the header stored on disk, and only one is valid at any given point in time. Since we fix the size of the header structure, we always know their locations. The first at offset zero, the other is immediately after the first. The header that is currently valid is the header with the later/larger CLSN. We write the header and the BTT to disk during a checkpoint or when a dictionary is closed (the only time we do so). The header overwrites the older header (the one with the older CLSN) on disk. From that moment onward, the disk space used by the previous version of the dictionary (the whole thing, not just the header) that is not also used by the latest version, is considered immediately free. There is much more magic to how the PerconaFT does checkpoint and consistency, but that is really out of the scope of this post. Maybe a later post that addresses the sharp checkpoint of the PerconaFT can dive into this. The block allocator The block allocator is the algorithm and container that manages the list of known used blocks and unused holes within an open dictionary file. When a node gets written, it is the responsibility of the block allocator to find a suitable location in the file for the nodes data. It is always placed into a new block, never overwrites an existing block (except for reclaimed block space from blocks that are removed or moved and recorded during the last checkpoint). Conversely, when a node gets destroyed it is the responsibility of the block allocator to release that used space and create a hole out of the old block. That hole also must be merged with any other holes that are adjacent to it to have a record of just one large hole rather than a series of consecutive smaller holes. Fragmentation and large files The current implementation of the PerconaFT block allocator maintains a simple array of used blocks in memory for each open dictionary. The used blocks are ordered ascending by their offset in the file. The holes between the blocks are calculated by knowing the offset and size of the two bounding blocks. For example, one can calculate the hole offset and size between two adjacent blocks as: b[n].offset + b[n].size and b[n+1].offset – (b[n].offset + b[n].size), respectively. To find a suitable hole to place node data, the current block allocator starts at the first block in the array. It iterates through the blocks looking for a hole between blocks that is large enough to hold the nodes data. Once we find a hole, we cut the space needed for the node out of the hole and the remainder is left as a hole for another block to possibly use later. Note. Forcing alignment to 512 offsets for direct I/O has overhead, regardless if direct I/O is used or not. This linear search severely degrades the PerconaFT performance for very large and fragmented dictionary files. We have some solid evidence from the field that this does occur. We can see it via various profiling tools as a lot of time spent within block_allocator_strategy::first_fit. It is also quite easy to create a case by using very small node (block) sizes and small fanouts (forces the existence of more nodes, and thus more small holes). This fragmentation can and does cause all sorts of side effects as the search operation locks the entire structure within memory. It blocks nodes from translating their node/block IDs into file locations. Let’s fix it… In this block storage paradigm, fragmentation is inevitable. We can try to dance around and propose different ways to prevent fragmentation (at the expense of higher CPU costs, online/offline operations, etc…). Or, we can look at the way the block allocator works and try to make it more efficient. Attacking the latter of the two options is a better strategy (not to say we aren’t still actively looking into the former). Tree-based “Max Hole Size” (MHS) lookup The linear search block allocator has no idea where bigger and smaller holes might be located within the set (a core limitation). It must use brute force to find a hole big enough for the data it needs to store. To address this, we implemented a new in-memory, tree-based algorithm (red-black tree). This replaces the current in-memory linear array and integrates the hole size search into the tree structure itself. In this new block allocator implementation, we store the set of known in-use blocks within the node structure of a binary tree instead of a linear array. We order the  tree by the file offset of the blocks. We then added a little extra data to each node of this new tree structure. This data tells us the maximum hole we can expect to find in each child subtree. So now when searching for a hole, we can quickly drill down the tree to find an available hole of the correct size without needing to perform a fully linear scan. The trade off is that merging holes together and updating the parental max hole sizes is slightly more intricate and time-consuming than in a linear structure. The huge improvement in search efficiency makes this extra overhead pure noise. We can see in this overly simplified diagram, we have five blocks: offset 0 : 1 byte offset 3 : 2 bytes offset 6 : 3 bytes offset 10 : 5 bytes offset 20 : 8 bytes We can calculate four holes in between those blocks: offset 1 : 2 bytes offset 5 : 1 byte offset 9 : 1 byte offset 15 : 5 bytes We see that the search for a 4-byte hole traverses down the right side of the tree. It discovers a hole at offset 15. This hole is a big enough for our 4 bytes. It does this without needing to visit the nodes at offsets 0 and 3. For you algorithmic folks out there, we have gone from an O(n) to O(log n) search. This is tremendously more efficient when we get into severe fragmentation states. A side effect is that we tend to allocate blocks from holes closer to the needed size rather than from the first one big enough to fit. The small hole fragmentation issue may actually increase over time, but that has yet to be seen in our testing. Benchmarks As our CTO Vadim Tkachenko asserts, there are “Lies, Damned Lies and Benchmarks.” We’re going to show a simple test case where we thought, “What is the worst possible scenario that I can come up with in a small-ish benchmark to show the differences?”. So, rather than try and convince you using some pseudo-real-world benchmark that uses sleight of hand, I’m telling you up front that this example is slightly absurd, but pushes the issue to the foreground. That scenario is actually pretty simple. We shape the tree to have as many nodes as possible, and intentionally use settings that reduce concurrency. We will use a standard sysbench OLTP test, and run it for about three hours after the prepare stage has completed: Hardware: Intel i7, 4 core hyperthread (8 virtual cores) @ 2.8 GHz 16 GB of memory Samsung 850 Pro SSD Sysbench OLTP: 1 table of 160M rows or about 30GB of primary key data and 4GB secondary key data 24 threads We started each test server instance with no data. Then we ran the sysbench prepare, then the sysbench run with no shutdown in between the prepare and run. prepare command : /data/percona/sysbench/sysbench/sysbench –test=/data/percona/sysbench/sysbench/tests/db/parallel_prepare.lua –mysql-table-engine=tokudb –oltp-tables-count=1 –oltp-table-size=160000000 –mysql-socket=$(PWD)/var/mysql.sock –mysql-user=root –num_threads=1 run run command : /data/percona/sysbench/sysbench/sysbench –test=/data/percona/sysbench/sysbench/tests/db/oltp.lua –mysql-table-engine=tokudb –oltp-tables-count=1 –oltp-table-size=160000000 –rand-init=on –rand-type=uniform –num_threads=24 –report-interval=30 –max-requests=0 –max-time=10800 –percentile=99 –mysql-socket=$(PWD)/var/mysql.sock –mysql-user=root run mysqld/TokuDB configuration innodb_buffer_pool_size=5242880 tokudb_directio=on tokudb_empty_scan=disabled tokudb_commit_sync=off tokudb_cache_size=8G tokudb_checkpointing_period=300 tokudb_checkpoint_pool_threads=1 tokudb_enable_partial_eviction=off tokudb_fsync_log_period=1000 tokudb_fanout=8 tokudb_block_size=8K tokudb_read_block_size=1K tokudb_row_format=tokudb_uncompressed tokudb_cleaner_period=1 tokudb_cleaner_iterations=10000 So as you can see: amazing results, right? Sustained throughput, immensely better response time and better utilization of available CPU resources. Of course, this is all fake with a tree shape that no sane user would implement. It illustrates what happens when the linear list contains small holes: exactly what we set out to fix! Closing Look for this improvement to appear in Percona Server 5.6.32-78.0 and 5.7.14-7. It’s a good one for you if you have huge TokuDB data files with lots and lots of nodes. Credits! Throughout this post, I referred to “we” numerous times. That “we” encompasses a great many people that have looked into this in the past and implemented the current solution. Some are current and former Percona and Tokutek employees that you may already know by name. Some are newer at Percona. I got to take their work and research, incorporate it into the current codebase, test and benchmark it, and report it here for all to see. Many thanks go out to Jun Yuan, Leif Walsh, John Esmet, Rich Prohaska, Bradley Kuszmaul, Alexey Stroganov, Laurynas Biveinis, Vlad Lesin, Christian Rober and others for all of the effort in diagnosing this issue, inventing a solution, and testing and reviewing this change to the PerconaFT library.

  • MySQL Document Store: unstructured data, unstructured search
    Storing documents is convenient: no need to define a schema up-front, no downtime for schema changes, no normalization, no slow joins – true or not, you name it. But what about search if you do not know how your data is structured? For example, the famous SFW-construct requires listing the columns to search: SELECT … FROM … WHERE some_column = ‘Jippie’ . Given that JSON data can have no schema how to write a query without knowing any field names, where is SELECT … FROM … WHERE * = ‘Jippie’? JSON_SEARCH() gets you started but there are gaps if you think about it for a minute. There are many reasons why you may not know the structure of the data you operate on. Maybe, you have gathered documents using different “schema versions” over time, maybe, there is simply no common structure because the data comes from a variety of different sources. This bares two challenges: how to search not knowing: field names table/collection/index names how to optimize search and storage not knowing: schema/structure of documents cardinalities and space requirements of fields As we will see, the MySQL Document Store does not have answers to all questions. Search not knowing field names: JSON_SEARCH() The SQL language is made for typed and well structured data. The ANSI/ISO take on JSON is not trying to integrate it deeply into the type system of SQL but to use functions and a path language to work on JSON. A function based approach is a low risk approach for vendors, and it works for this particular task too. JSON_SEARCH() scans a document for matching strings. All of the following examples use two identical tables holding 50.000 respectively 1.000.000 randomly generated JSON product descriptions. The table definition is very basic: CREATE TABLE products ( product_id INT NOT NULL AUTO_INCREMENT product JSON DEFAULT NULL PRIMARY KEY(product_id) ) JSON_SEARCH takes a JSON document, a search term and returns the path to the value. The search term supports the wildcards also supported by LIKE. You can use “%” to match any number of characters and “_” to match one arbitrary character: SELECT JSON_SEARCH(product, "all", "Beach") <-- search "document" FROM products <-- table/collection/index is known WHERE JSON_SEARCH(product, "all", "Beach") IS NOT NULL +--------------------------------------+ | JSON_SEARCH(product, "all", "Beach") | +--------------------------------------+ | "$.title" | +--------------------------------------+ SELECT JSON_SEARCH( product, "all", <-- return patch to all matches "%b%" ) FROM products WHERE JSON_SEARCH(product, "one", "%b%") IS NOT NULL LIMIT 2 +------------------------------------+ | JSON_SEARCH(product, "all", "%b%") | +------------------------------------+ | ["$.title", "$.description"] | | ["$.title", "$.description"] | +------------------------------------+ The function can either return the path to the first match in a document or a list of path expressions to all matches. The return value is of type JSON. This means, there’s a bit of type juggling required if you want to use the path to actually fetch the matching field value: SELECT JSON_EXTRACT( product, JSON_UNQUOTE( JSON_SEARCH( product, "one", "Beach" ) ) ) AS _match FROM products WHERE JSON_SEARCH(product, "all", "Beach") IS NOT NULL +---------+ | _match | +---------+ | "Beach" | +---------+ I am unaware of any SQL statement which gives you the value for all matching path expressions returned by JSON_SEARCH(…, “all”, …). At the time of writing MySQL lacks a group of nest/unnest operators what can extract rows from JSON arrays/objects. Once such functionality is available it should become possible to extract all matching values. There are two more issues with JSON_SEARCH: it is limited to string matching and it is slow. If, for example, you know that somewhere in your JSON documents there is a field that holds a numeric value representing a year but you don’t know the field/path, then it becomes tricky and complex to search for it. JSON string search using a fulltext index Given that JSON_SEARCH is limited to string matching one may be tempted to replace it with a fulltext index search. A fulltext index based search is much faster than a full table scan performed by JSON_SEARCH. Problem is, MySQL does not support creating a fulltext index for an entire JSON document: ALTER TABLE products ADD FULLTEXT idx_ft_product(product); ERROR 3152 (42000): JSON column 'product' cannot be used in key specification. A possible workaround is to extract all string columns from a JSON column and concatenate their values. Ideally, this would be possible using a virtual generated column to safe disk space. Problem is, MySQL will not create a fulltext index on a virtual generated columns: ALTER TABLE products ADD facet_text TEXT GENERATED ALWAYS AS( CONCAT_WS( " ", JSON_UNQUOTE( JSON_EXTRACT( product, "$.title" ) ) JSON_UNQUOTE( JSON_EXTRACT( product, "$.description" ) ) ) ) ALTER TABLE products ADD FULLTEXT idx_ft_facet_text(facet_text); ERROR 3106 (HY000): 'Fulltext index on virtual generated column' is not supported for generated columns. This problem can be solved using a stored generated column but we are running in circles. The task is to find information in unstructured data. How do you know which values to extract for indexing if you do not know about the structure…. Elasticsearch solves it by flipping the logic upside down. Elasticsearch does index all strings from a JSON document unless you tell it to ignore some of them. I wouldn’t be surprised to learn the MySQL JSON fulltext indexing issue can be solved with something as simple as a custom fulltext parser. Figuring out the string fields in JSON There is no solution but at least you can figure out which fields to extract and index by anaylsing a given data set. Looking at existing data does not help you with future unstructured data but it is better than nothing. Recall that JSON_SEARCH accepts wildcards and returns path expressions. Use “%” to produce a list of all matching path expressions. By help of the patch expressions it becomes possible to create a stored generated column and add a fulltext index on it. SELECT COUNT(*), JSON_SEARCH(product, "all", "%") AS _string_paths FROM products GROUP BY _string_paths +----------+-------------------------------------------------------------------------------------------------+ | COUNT(*) | _string_paths | +----------+-------------------------------------------------------------------------------------------------+ | 1 | ["$.title", "$.description"] | | 3256 | ["$.size", "$.title", "$.material", "$.description", "$.in_stock_since"] | | 13296 | ["$.size", "$.title", "$.usage", "$.material", "$.description", "$.in_stock_since"] | | 6840 | ["$.size", "$.title", "$.gender", "$.material", "$.description", "$.in_stock_since"] | | 26607 | ["$.size", "$.title", "$.usage", "$.gender", "$.material", "$.description", "$.in_stock_since"] | +----------+-------------------------------------------------------------------------------------------------+ Be warned that this results in a full table scan and the aggregation may cause the use of a temporary table. If your data set is large and your disks are slow, like on my VM, mind your queries. How to manage what you don’t know? From this little example it becomes clear how valuable information on the structure or schema of existing JSON data is. Query optimizers like to use statistics to decide on the optimal query, and I can imagine that a DBA would be interested in such information as well. The little JSON_SEARCH trick tells me wich string fields there are in my JSON data. But what about searching all documents that have a number holding a value of 1974? Or, how to notice that an array inside a document has grown to a size that refactoring the storage is benefitial. A SQL user has the INFORMATION_SCHEMA at and to learn about tables, columns, storage space requirements but JSON is a black box in that world. JSON columns report the total or average size of the entire document they hold. But there is no way too look inside them other than fetching all the documents and building up some statistics on the application side. Lastly, what if you do not even know the table to search? If you take it to the extreme, why can I not do: SELECT … FROM * WHERE * = ‘Oh, no’ ? Why do I have to name the table? Imagine we would support joins between collections. You could choose whether you want to nest data and stuff everything in one document or normalize your data to avoid data duplication and update anomalies. The very moment you normalize your data, you loose the functionality offered by JSON_SEARCH(). JSON_SEARCH() can search all the strings in your nested data. The very moment you spread the nested data over two collections, you loose that feature. You now have to run two queries using JSON_SEARCH against the two collections holding the data and merge the results. How inconvenient… why not allow searching tables “linked” by foreign keys using one query? That use case a little aside the original question as it assumes some structure in your data, once again. However, the question remains valid: do we need unstructured search given our unstrucuted data? Happy hacking! @Ulf_Wendel The post MySQL Document Store: unstructured data, unstructured search appeared first on Ulf Wendel.

  • ClusterControl Tips & Tricks - Transparent Database Failover for your Applications
    ClusterControl is a great tool to deploy and manage databases clusters - if you are into MySQL, you can easily deploy clusters based on both traditional MySQL master-slave replication, Galera Cluster or MySQL NDB Cluster. To achieve high availability, deploying a cluster is not enough though. Nodes may (and will most probably) go down, and your system has to be able to adapt to those changes. This adaptation can happen at different levels. You can implement some kind of logic within the application - it would check the state of cluster nodes and direct traffic to the ones which are reachable at the given moment. You can also build a proxy layer which will implement high availability in your system. In this blog post, we’d like to share some tips on how you can achieve that using ClusterControl. Deploying HAProxy using the ClusterControl HAProxy is the standard - one of the most popular proxies used in connection with MySQL (but not only, of course). ClusterControl supports deployment and monitoring of HAProxy nodes. It also helps to implement high availability of the proxy itself using keepalived. Deployment is pretty simple - you need to pick or fill in the IP address of a host where HAProxy will be installed, pick port, load balancing policy, decide if ClusterControl should use existing repository or the most recent source code to deploy HAProxy. You can also pick which backend nodes you’d like to have included in the proxy configuration, and whether they should be active or backup. By default, the HAProxy instance deployed by ClusterControl won’t work correctly with a master-slave replication setup - it’s designed to implement round-robin type of load-balancing (e.g., for Galera Cluster where all nodes are writeable). There’s a way to go around this issue, though - in the following repository you can find a check script which is intended to work with MySQL Replication. You will need to replace the check deployed by ClusterControl with this particular file. Keepalived is used to add high availability to the proxy layer. When you have at least two HAProxy nodes in your system, you can install Keepalived from the ClusterControl UI. You’ll have to pick two HAProxy nodes and they will be configured as an active - standby pair. A Virtual IP would be assigned to the active server and, should it fail, it will be reassigned to the standby proxy. This way you can just connect to the VIP and all your queries will be routed to the currently active and working HAProxy node. You can find more details in how the internals are configured by reading through our HAProxy tutorial. Deploying MaxScale using ClusterControl While HAProxy is a rock-solid proxy and very popular choice, it lacks database awareness, e.g., read-write split. The only way to do it in HAProxy is to create two backends and listen on two ports - one for reads and one for writes. This is, usually, fine but it requires you to implement changes in your application - the application has to understand what is a read and what is a write, and then direct those queries to the correct port. It’d be much easier to just connect to a single port and let the proxy decide what to do next - this is something HAProxy cannot do as what it does is just routing packets - no packet inspection is done and, especially, it has no understanding of the MySQL protocol. MaxScale solves this problem - it talks MySQL protocol and it can (among other things) perform a read-write split. Installation of MaxScale from ClusterControl is simple - you want to go to Manage -> Load Balancer section and fill the “Install MaxScale” tab with the required data. In short, we need to pick where MaxScale will be installed, what admin user and password it should have, which user it should use to connect to the database. Next, we can pick number of threads MaxScale should use, ports and which nodes should be added to the loadbalancer. By default MaxScale is configured with two ways of accessing the database. You can use Round Robin listener on port 4006 - it will split connections between the available nodes in a round-robin fashion. If you want to use MaxScale’s ability to perform a read/write split, you need to connect to port 4008. Once connected,MaxScale will begin to parse your MySQL traffic and route it according to what queries you execute. In short, SELECT queries will be routed to slaves (or, in case of Galera Cluster, all nodes except of one picked as a master), remaining traffic will hit the master. Explicitly opened transactions will also open on the master only. In MySQL replication, the master is self-explanatory - it’s a master node which will be used by MaxScale. In Galera Cluster things are slightly different as it’s a multi-master environment. What MaxScale does is to check wsrep_local_index value on all Galera nodes - the one with the lowest index will be treated as a master. In case master goes down, another lowest-valued node is picked. MaxScale, as every proxy, can become a single point of failure and it has to be made redundant to achieve high availability. There are a couple of methods to do that. One of them is to collocate MaxScale on the web nodes. The idea here is that, most of the time, the MaxScale process will work just fine and the reason for its unavailability is that the whole node went down. In such case, if MaxScale is collocated with the web node, not much harm has been done because that particular web node will not be available either. Another method, not supported directly from the ClusterControl UI, is to use Keepalived in a similar way like we did in the case of HAProxy. You can do most of the hard work from the UI - you need to deploy MaxScale on two hosts. Then, you want to deploy HAProxy on the same two nodes - so that ClusterControl would allow you to install Keepalived. That’s what you need to do next - install Keepalived, making sure the IP addresses of HAProxy you picked are the same as the IPs of your MaxScale nodes. Once Keepalived has been set up by ClusterControl, you can just remove the HAProxy nodes. Next step requires CLI access - you need to log into both MaxScale (and Keepalived) nodes and edit the Keepalived configuration. On Centos it is located in /etc/keepalived/keepalived.conf. What you need to do is, basically, pass it through sed ‘s/haproxy/maxscale/g’ - replace all mentions of haproxy with maxscale and then apply changes through restart of Keepalived on both nodes. Last step would be to edit MaxScale’s configuration and make sure it listens on either Virtual IP or on all interfaces - restart MaxScale to apply your changes and you are all set: your MaxScale health will be monitored by Keepalived and, in case of failure of the active node, Virtual IP will be moved to the standby node. Given how MaxScale decides which one of the nodes in a Galera Cluster is a “master”, as long as you do not use any stickiness (by default, the master node is a node with the lowest wsrep_local_index which means that old master will become a master again, if it ever comes back online - you can change that so MaxScale would stick to the new master), all MaxScale nodes will see the cluster in the same way. You just have to make sure all nodes of the cluster have been added to MaxScale. Tags: MySQLMariaDBhaproxyMaxScalekeepalivedhigh availabilityreplicationload balancerfailover

  • How Apache Spark makes your slow MySQL queries 10x faster (or more)
    In this blog post, we’ll discuss how to improve the performance of slow MySQL queries using Apache Spark. Introduction In my previous blog post, I wrote about using Apache Spark with MySQL for data analysis and showed how to transform and analyze a large volume of data (text files) with Apache Spark. Vadim also performed a benchmark comparing performance of MySQL and Spark with Parquet columnar format (using Air traffic performance data). That works great, but what if we don’t want to move our data from MySQL to another storage (i.e., columnar format), and instead want to use “ad hock” queries on top of an existing MySQL server? Apache Spark can help here as well. TL;DR version: Using Apache Spark on top of the existing MySQL server(s) (without the need to export or even stream data to Spark or Hadoop), we can increase query performance more than ten times. Using multiple MySQL servers (replication or Percona XtraDB Cluster) gives us an additional performance increase for some queries. You can also use the Spark cache function to cache the whole MySQL query results table. The idea is simple: Spark can read MySQL data via JDBC and can also execute SQL queries, so we can connect it directly to MySQL and run the queries. Why is this faster? For long running (i.e., reporting or BI) queries, it can be much faster as Spark is a massively parallel system. MySQL can only use one CPU core per query, whereas Spark can use all cores on all cluster nodes. In my examples below, MySQL queries are executed inside Spark and run 5-10 times faster (on top of the same MySQL data). In addition, Spark can add “cluster” level parallelism. In the case of MySQL replication or Percona XtraDB Cluster, Spark can split the query into a set of smaller queries (in the case of a partitioned table it will run one query per each partition for example) and run those in parallel across multiple slave servers of multiple Percona XtraDB Cluster nodes. Finally, it will use map/reduce the type of processing to aggregate the results. I’ve used the same “Airlines On-Time Performance” database as in previous posts. Vadim created some scripts to download data and upload it to MySQL. You can find the scripts here: https://github.com/Percona-Lab/ontime-airline-performance. I’ve also used Apache Spark 2.0, which was released July 26, 2016. Apache Spark Setup Starting Apache Spark in standalone mode is easy. To recap: Download the Apache Spark 2.0 and place it somewhere. Start master Start slave (worker) and attach it to the master Start the app (in this case spark-shell or spark-sql) Example:root@thor:~/spark# ./sbin/start-master.sh less ../logs/spark-root-org.apache.spark.deploy.master.Master-1-thor.out 15/08/25 11:21:21 INFO Master: Starting Spark master at spark://thor:7077 15/08/25 11:21:21 INFO Utils: Successfully started service 'MasterUI' on port 8080. 15/08/25 11:21:21 INFO MasterWebUI: Started MasterWebUI at http://10.60.23.188:8080 root@thor:~/spark# ./sbin/start-slave.sh spark://thor:7077To connect to Spark we can use spark-shell (Scala), pyspark (Python) or spark-sql. Since spark-sql is similar to MySQL cli, using it would be the easiest option (even “show tables” works). I also wanted to work with Scala in interactive mode so I’ve used spark-shell as well. In all the examples I’m using the same SQL query in MySQL and Spark, so working with Spark is not that different. To work with MySQL server in Spark we need Connector/J for MySQL. Download the package and copy the mysql-connector-java-5.1.39-bin.jar to the spark directory, then add the class path to the conf/spark-defaults.conf:spark.driver.extraClassPath = /usr/local/spark/mysql-connector-java-5.1.39-bin.jar spark.executor.extraClassPath = /usr/local/spark/mysql-connector-java-5.1.39-bin.jar Running MySQL queries via Apache Spark For this test I was using one physical server with 12 CPU cores (older Intel(R) Xeon(R) CPU L5639 @ 2.13GHz) and 48G of RAM, SSD disks. I’ve installed MySQL and started spark master and spark slave on the same box. Now we are ready to run MySQL queries inside Spark. First, start the shell (from the Spark directory, /usr/local/spark in my case):$ ./bin/spark-shell --driver-memory 4G --master spark://server1:7077Then we will need to connect to MySQL from spark and register the temporary view:val jdbcDF = spark.read.format("jdbc").options( Map("url" -> "jdbc:mysql://localhost:3306/ontime?user=root&password=", "dbtable" -> "ontime.ontime_part", "fetchSize" -> "10000", "partitionColumn" -> "yeard", "lowerBound" -> "1988", "upperBound" -> "2016", "numPartitions" -> "28" )).load() jdbcDF.createOrReplaceTempView("ontime")So we have created a “datasource” for Spark (or in other words, a “link” from Spark to MySQL). The Spark table name is “ontime” (linked to MySQL ontime.ontime_part table) and we can run SQL queries in Spark, which in turn parse it and translate it in MySQL queries. “partitionColumn” is very important here. It tells Spark to run multiple queries in parallel, one query per each partition. Now we can run the query:val sqlDF = sql("select min(year), max(year) as max_year, Carrier, count(*) as cnt, sum(if(ArrDelayMinutes>30, 1, 0)) as flights_delayed, round(sum(if(ArrDelayMinutes>30, 1, 0))/count(*),2) as rate FROM ontime WHERE DayOfWeek not in (6,7) and OriginState not in ('AK', 'HI', 'PR', 'VI') and DestState not in ('AK', 'HI', 'PR', 'VI') and (origin = 'RDU' or dest = 'RDU') GROUP by carrier HAVING cnt > 100000 and max_year > '1990' ORDER by rate DESC, cnt desc LIMIT 10") sqlDF.show() MySQL Query Example Let’s go back to MySQL for a second and look at the query example. I’ve chosen the following query (from my older blog post):select min(year), max(year) as max_year, Carrier, count(*) as cnt, sum(if(ArrDelayMinutes>30, 1, 0)) as flights_delayed, round(sum(if(ArrDelayMinutes>30, 1, 0))/count(*),2) as rate FROM ontime WHERE DayOfWeek not in (6,7) and OriginState not in ('AK', 'HI', 'PR', 'VI') and DestState not in ('AK', 'HI', 'PR', 'VI') GROUP by carrier HAVING cnt > 100000 and max_year > '1990' ORDER by rate DESC, cnt desc LIMIT 10The query will find the total number of delayed flights per each airline. In addition, the query will calculate the smart “ontime” rating, taking into consideration the number of flights (we do not want to compare smaller air carriers with the large ones, and we want to exclude the older airlines who are not in business anymore). The main reason I’ve chosen this query is that it is hard to optimize it in MySQL. All conditions in the “where” clause will only filter out ~70% of rows. I’ve done a basic calculation:mysql> select count(*) FROM ontime WHERE DayOfWeek not in (6,7) and OriginState not in ('AK', 'HI', 'PR', 'VI') and DestState not in ('AK', 'HI', 'PR', 'VI'); +-----------+ | count(*) | +-----------+ | 108776741 | +-----------+ mysql> select count(*) FROM ontime; +-----------+ | count(*) | +-----------+ | 152657276 | +-----------+ mysql> select round((108776741/152657276)*100, 2); +-------------------------------------+ | round((108776741/152657276)*100, 2) | +-------------------------------------+ | 71.26 | +-------------------------------------+Table structure:CREATE TABLE `ontime_part` ( `YearD` int(11) NOT NULL, `Quarter` tinyint(4) DEFAULT NULL, `MonthD` tinyint(4) DEFAULT NULL, `DayofMonth` tinyint(4) DEFAULT NULL, `DayOfWeek` tinyint(4) DEFAULT NULL, `FlightDate` date DEFAULT NULL, `UniqueCarrier` char(7) DEFAULT NULL, `AirlineID` int(11) DEFAULT NULL, `Carrier` char(2) DEFAULT NULL, `TailNum` varchar(50) DEFAULT NULL, ... `id` int(11) NOT NULL AUTO_INCREMENT, PRIMARY KEY (`id`,`YearD`), KEY `covered` (`DayOfWeek`,`OriginState`,`DestState`,`Carrier`,`YearD`,`ArrDelayMinutes`) ) ENGINE=InnoDB AUTO_INCREMENT=162668935 DEFAULT CHARSET=latin1 /*!50100 PARTITION BY RANGE (YearD) (PARTITION p1987 VALUES LESS THAN (1988) ENGINE = InnoDB, PARTITION p1988 VALUES LESS THAN (1989) ENGINE = InnoDB, PARTITION p1989 VALUES LESS THAN (1990) ENGINE = InnoDB, PARTITION p1990 VALUES LESS THAN (1991) ENGINE = InnoDB, PARTITION p1991 VALUES LESS THAN (1992) ENGINE = InnoDB, PARTITION p1992 VALUES LESS THAN (1993) ENGINE = InnoDB, PARTITION p1993 VALUES LESS THAN (1994) ENGINE = InnoDB, PARTITION p1994 VALUES LESS THAN (1995) ENGINE = InnoDB, PARTITION p1995 VALUES LESS THAN (1996) ENGINE = InnoDB, PARTITION p1996 VALUES LESS THAN (1997) ENGINE = InnoDB, PARTITION p1997 VALUES LESS THAN (1998) ENGINE = InnoDB, PARTITION p1998 VALUES LESS THAN (1999) ENGINE = InnoDB, PARTITION p1999 VALUES LESS THAN (2000) ENGINE = InnoDB, PARTITION p2000 VALUES LESS THAN (2001) ENGINE = InnoDB, PARTITION p2001 VALUES LESS THAN (2002) ENGINE = InnoDB, PARTITION p2002 VALUES LESS THAN (2003) ENGINE = InnoDB, PARTITION p2003 VALUES LESS THAN (2004) ENGINE = InnoDB, PARTITION p2004 VALUES LESS THAN (2005) ENGINE = InnoDB, PARTITION p2005 VALUES LESS THAN (2006) ENGINE = InnoDB, PARTITION p2006 VALUES LESS THAN (2007) ENGINE = InnoDB, PARTITION p2007 VALUES LESS THAN (2008) ENGINE = InnoDB, PARTITION p2008 VALUES LESS THAN (2009) ENGINE = InnoDB, PARTITION p2009 VALUES LESS THAN (2010) ENGINE = InnoDB, PARTITION p2010 VALUES LESS THAN (2011) ENGINE = InnoDB, PARTITION p2011 VALUES LESS THAN (2012) ENGINE = InnoDB, PARTITION p2012 VALUES LESS THAN (2013) ENGINE = InnoDB, PARTITION p2013 VALUES LESS THAN (2014) ENGINE = InnoDB, PARTITION p2014 VALUES LESS THAN (2015) ENGINE = InnoDB, PARTITION p2015 VALUES LESS THAN (2016) ENGINE = InnoDB, PARTITION p_new VALUES LESS THAN MAXVALUE ENGINE = InnoDB) */Even with a “covered” index, MySQL will have to scan ~70M-100M of rows and create a temporary table:mysql> explain select min(yearD), max(yearD) as max_year, Carrier, count(*) as cnt, sum(if(ArrDelayMinutes>30, 1, 0)) as flights_delayed, round(sum(if(ArrDelayMinutes>30, 1, 0))/count(*),2) as rate FROM ontime_part WHERE DayOfWeek not in (6,7) and OriginState not in ('AK', 'HI', 'PR', 'VI') and DestState not in ('AK', 'HI', 'PR', 'VI') GROUP by carrier HAVING cnt > 1000 and max_year > '1990' ORDER by rate DESC, cnt desc LIMIT 10G *************************** 1. row *************************** id: 1 select_type: SIMPLE table: ontime_part type: range possible_keys: covered key: covered key_len: 2 ref: NULL rows: 70483364 Extra: Using where; Using index; Using temporary; Using filesort 1 row in set (0.00 sec)What is the query response time in MySQL:mysql> select min(yearD), max(yearD) as max_year, Carrier, count(*) as cnt, sum(if(ArrDelayMinutes>30, 1, 0)) as flights_delayed, round(sum(if(ArrDelayMinutes>30, 1, 0))/count(*),2) as rate FROM ontime_part WHERE DayOfWeek not in (6,7) and OriginState not in ('AK', 'HI', 'PR', 'VI') and DestState not in ('AK', 'HI', 'PR', 'VI') GROUP by carrier HAVING cnt > 1000 and max_year > '1990' ORDER by rate DESC, cnt desc LIMIT 10; +------------+----------+---------+----------+-----------------+------+ | min(yearD) | max_year | Carrier | cnt | flights_delayed | rate | +------------+----------+---------+----------+-----------------+------+ | 2003 | 2013 | EV | 2962008 | 464264 | 0.16 | | 2003 | 2013 | B6 | 1237400 | 187863 | 0.15 | | 2006 | 2011 | XE | 1615266 | 230977 | 0.14 | | 2003 | 2005 | DH | 501056 | 69833 | 0.14 | | 2001 | 2013 | MQ | 4518106 | 605698 | 0.13 | | 2003 | 2013 | FL | 1692887 | 212069 | 0.13 | | 2004 | 2010 | OH | 1307404 | 175258 | 0.13 | | 2006 | 2013 | YV | 1121025 | 143597 | 0.13 | | 2003 | 2006 | RU | 1007248 | 126733 | 0.13 | | 1988 | 2013 | UA | 10717383 | 1327196 | 0.12 | +------------+----------+---------+----------+-----------------+------+ 10 rows in set (19 min 16.58 sec)19 minutes is definitely not great. SQL in Spark Now we want to run the same query inside Spark and let Spark read data from MySQL. We will create a “datasource” and execute the query:scala> val jdbcDF = spark.read.format("jdbc").options( | Map("url" -> "jdbc:mysql://localhost:3306/ontime?user=root&password=mysql", | "dbtable" -> "ontime.ontime_sm", | "fetchSize" -> "10000", | "partitionColumn" -> "yeard", "lowerBound" -> "1988", "upperBound" -> "2015", "numPartitions" -> "48" | )).load() 16/08/02 23:24:12 WARN JDBCRelation: The number of partitions is reduced because the specified number of partitions is less than the difference between upper bound and lower bound. Updated number of partitions: 27; Input number of partitions: 48; Lower bound: 1988; Upper bound: 2015. dbcDF: org.apache.spark.sql.DataFrame = [id: int, YearD: date ... 19 more fields] scala> jdbcDF.createOrReplaceTempView("ontime") scala> val sqlDF = sql("select min(yearD), max(yearD) as max_year, Carrier, count(*) as cnt, sum(if(ArrDelayMinutes>30, 1, 0)) as flights_delayed, round(sum(if(ArrDelayMinutes>30, 1, 0))/count(*),2) as rate FROM ontime WHERE OriginState not in ('AK', 'HI', 'PR', 'VI') and DestState not in ('AK', 'HI', 'PR', 'VI') GROUP by carrier HAVING cnt > 1000 and max_year > '1990' ORDER by rate DESC, cnt desc LIMIT 10") sqlDF: org.apache.spark.sql.DataFrame = [min(yearD): date, max_year: date ... 4 more fields] scala> sqlDF.show() +----------+--------+-------+--------+---------------+----+ |min(yearD)|max_year|Carrier| cnt|flights_delayed|rate| +----------+--------+-------+--------+---------------+----+ | 2003| 2013| EV| 2962008| 464264|0.16| | 2003| 2013| B6| 1237400| 187863|0.15| | 2006| 2011| XE| 1615266| 230977|0.14| | 2003| 2005| DH| 501056| 69833|0.14| | 2001| 2013| MQ| 4518106| 605698|0.13| | 2003| 2013| FL| 1692887| 212069|0.13| | 2004| 2010| OH| 1307404| 175258|0.13| | 2006| 2013| YV| 1121025| 143597|0.13| | 2003| 2006| RU| 1007248| 126733|0.13| | 1988| 2013| UA|10717383| 1327196|0.12| +----------+--------+-------+--------+---------------+----+spark-shell does not show the query time. This can be retrieved from Web UI or from spark-sql. I’ve re-run the same query in spark-sql:./bin/spark-sql --driver-memory 4G --master spark://thor:7077 spark-sql> CREATE TEMPORARY VIEW ontime > USING org.apache.spark.sql.jdbc > OPTIONS ( > url "jdbc:mysql://localhost:3306/ontime?user=root&password=", > dbtable "ontime.ontime_part", > fetchSize "1000", > partitionColumn "yearD", lowerBound "1988", upperBound "2014", numPartitions "48" > ); 16/08/04 01:44:27 WARN JDBCRelation: The number of partitions is reduced because the specified number of partitions is less than the difference between upper bound and lower bound. Updated number of partitions: 26; Input number of partitions: 48; Lower bound: 1988; Upper bound: 2014. Time taken: 3.864 seconds spark-sql> select min(yearD), max(yearD) as max_year, Carrier, count(*) as cnt, sum(if(ArrDelayMinutes>30, 1, 0)) as flights_delayed, round(sum(if(ArrDelayMinutes>30, 1, 0))/count(*),2) as rate FROM ontime WHERE DayOfWeek not in (6,7) and OriginState not in ('AK', 'HI', 'PR', 'VI') and DestState not in ('AK', 'HI', 'PR', 'VI') GROUP by carrier HAVING cnt > 1000 and max_year > '1990' ORDER by rate DESC, cnt desc LIMIT 10; 16/08/04 01:45:13 WARN Utils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.debug.maxToStringFields' in SparkEnv.conf. 2003 2013 EV 2962008 464264 0.16 2003 2013 B6 1237400 187863 0.15 2006 2011 XE 1615266 230977 0.14 2003 2005 DH 501056 69833 0.14 2001 2013 MQ 4518106 605698 0.13 2003 2013 FL 1692887 212069 0.13 2004 2010 OH 1307404 175258 0.13 2006 2013 YV 1121025 143597 0.13 2003 2006 RU 1007248 126733 0.13 1988 2013 UA 10717383 1327196 0.12 Time taken: 139.628 seconds, Fetched 10 row(s)So the response time of the same query is almost 10x faster (on the same server, just one box). But now how was this query translated to MySQL queries, and why it is so much faster? Here is what is happening inside MySQL: Inside MySQL Spark:scala> sqlDF.show() [Stage 4:> (0 + 26) / 26]MySQL:mysql> select id, info from information_schema.processlist where info is not NULL and info not like '%information_schema%'; +-------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ | id | info | +-------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ | 10948 | SELECT `YearD`,`ArrDelayMinutes`,`Carrier` FROM ontime.ontime_part WHERE (((NOT (DayOfWeek IN (6, 7)))) AND ((NOT (OriginState IN ('AK', 'HI', 'PR', 'VI')))) AND ((NOT (DestState IN ('AK', 'HI', 'PR', 'VI'))))) AND (yearD >= 2001 AND yearD < 2002) | | 10965 | SELECT `YearD`,`ArrDelayMinutes`,`Carrier` FROM ontime.ontime_part WHERE (((NOT (DayOfWeek IN (6, 7)))) AND ((NOT (OriginState IN ('AK', 'HI', 'PR', 'VI')))) AND ((NOT (DestState IN ('AK', 'HI', 'PR', 'VI'))))) AND (yearD >= 2007 AND yearD < 2008) | | 10966 | SELECT `YearD`,`ArrDelayMinutes`,`Carrier` FROM ontime.ontime_part WHERE (((NOT (DayOfWeek IN (6, 7)))) AND ((NOT (OriginState IN ('AK', 'HI', 'PR', 'VI')))) AND ((NOT (DestState IN ('AK', 'HI', 'PR', 'VI'))))) AND (yearD >= 1991 AND yearD < 1992) | | 10967 | SELECT `YearD`,`ArrDelayMinutes`,`Carrier` FROM ontime.ontime_part WHERE (((NOT (DayOfWeek IN (6, 7)))) AND ((NOT (OriginState IN ('AK', 'HI', 'PR', 'VI')))) AND ((NOT (DestState IN ('AK', 'HI', 'PR', 'VI'))))) AND (yearD >= 1994 AND yearD < 1995) | | 10968 | SELECT `YearD`,`ArrDelayMinutes`,`Carrier` FROM ontime.ontime_part WHERE (((NOT (DayOfWeek IN (6, 7)))) AND ((NOT (OriginState IN ('AK', 'HI', 'PR', 'VI')))) AND ((NOT (DestState IN ('AK', 'HI', 'PR', 'VI'))))) AND (yearD >= 1998 AND yearD < 1999) | | 10969 | SELECT `YearD`,`ArrDelayMinutes`,`Carrier` FROM ontime.ontime_part WHERE (((NOT (DayOfWeek IN (6, 7)))) AND ((NOT (OriginState IN ('AK', 'HI', 'PR', 'VI')))) AND ((NOT (DestState IN ('AK', 'HI', 'PR', 'VI'))))) AND (yearD >= 2010 AND yearD < 2011) | | 10970 | SELECT `YearD`,`ArrDelayMinutes`,`Carrier` FROM ontime.ontime_part WHERE (((NOT (DayOfWeek IN (6, 7)))) AND ((NOT (OriginState IN ('AK', 'HI', 'PR', 'VI')))) AND ((NOT (DestState IN ('AK', 'HI', 'PR', 'VI'))))) AND (yearD >= 2002 AND yearD < 2003) | | 10971 | SELECT `YearD`,`ArrDelayMinutes`,`Carrier` FROM ontime.ontime_part WHERE (((NOT (DayOfWeek IN (6, 7)))) AND ((NOT (OriginState IN ('AK', 'HI', 'PR', 'VI')))) AND ((NOT (DestState IN ('AK', 'HI', 'PR', 'VI'))))) AND (yearD >= 2006 AND yearD < 2007) | | 10972 | SELECT `YearD`,`ArrDelayMinutes`,`Carrier` FROM ontime.ontime_part WHERE (((NOT (DayOfWeek IN (6, 7)))) AND ((NOT (OriginState IN ('AK', 'HI', 'PR', 'VI')))) AND ((NOT (DestState IN ('AK', 'HI', 'PR', 'VI'))))) AND (yearD >= 1990 AND yearD < 1991) | | 10953 | SELECT `YearD`,`ArrDelayMinutes`,`Carrier` FROM ontime.ontime_part WHERE (((NOT (DayOfWeek IN (6, 7)))) AND ((NOT (OriginState IN ('AK', 'HI', 'PR', 'VI')))) AND ((NOT (DestState IN ('AK', 'HI', 'PR', 'VI'))))) AND (yearD >= 2009 AND yearD < 2010) | | 10947 | SELECT `YearD`,`ArrDelayMinutes`,`Carrier` FROM ontime.ontime_part WHERE (((NOT (DayOfWeek IN (6, 7)))) AND ((NOT (OriginState IN ('AK', 'HI', 'PR', 'VI')))) AND ((NOT (DestState IN ('AK', 'HI', 'PR', 'VI'))))) AND (yearD >= 1993 AND yearD < 1994) | | 10956 | SELECT `YearD`,`ArrDelayMinutes`,`Carrier` FROM ontime.ontime_part WHERE (((NOT (DayOfWeek IN (6, 7)))) AND ((NOT (OriginState IN ('AK', 'HI', 'PR', 'VI')))) AND ((NOT (DestState IN ('AK', 'HI', 'PR', 'VI'))))) AND (yearD < 1989 or yearD is null) | | 10951 | SELECT `YearD`,`ArrDelayMinutes`,`Carrier` FROM ontime.ontime_part WHERE (((NOT (DayOfWeek IN (6, 7)))) AND ((NOT (OriginState IN ('AK', 'HI', 'PR', 'VI')))) AND ((NOT (DestState IN ('AK', 'HI', 'PR', 'VI'))))) AND (yearD >= 2005 AND yearD < 2006) | | 10954 | SELECT `YearD`,`ArrDelayMinutes`,`Carrier` FROM ontime.ontime_part WHERE (((NOT (DayOfWeek IN (6, 7)))) AND ((NOT (OriginState IN ('AK', 'HI', 'PR', 'VI')))) AND ((NOT (DestState IN ('AK', 'HI', 'PR', 'VI'))))) AND (yearD >= 1996 AND yearD < 1997) | | 10955 | SELECT `YearD`,`ArrDelayMinutes`,`Carrier` FROM ontime.ontime_part WHERE (((NOT (DayOfWeek IN (6, 7)))) AND ((NOT (OriginState IN ('AK', 'HI', 'PR', 'VI')))) AND ((NOT (DestState IN ('AK', 'HI', 'PR', 'VI'))))) AND (yearD >= 2008 AND yearD < 2009) | | 10961 | SELECT `YearD`,`ArrDelayMinutes`,`Carrier` FROM ontime.ontime_part WHERE (((NOT (DayOfWeek IN (6, 7)))) AND ((NOT (OriginState IN ('AK', 'HI', 'PR', 'VI')))) AND ((NOT (DestState IN ('AK', 'HI', 'PR', 'VI'))))) AND (yearD >= 1999 AND yearD < 2000) | | 10962 | SELECT `YearD`,`ArrDelayMinutes`,`Carrier` FROM ontime.ontime_part WHERE (((NOT (DayOfWeek IN (6, 7)))) AND ((NOT (OriginState IN ('AK', 'HI', 'PR', 'VI')))) AND ((NOT (DestState IN ('AK', 'HI', 'PR', 'VI'))))) AND (yearD >= 2011 AND yearD < 2012) | | 10963 | SELECT `YearD`,`ArrDelayMinutes`,`Carrier` FROM ontime.ontime_part WHERE (((NOT (DayOfWeek IN (6, 7)))) AND ((NOT (OriginState IN ('AK', 'HI', 'PR', 'VI')))) AND ((NOT (DestState IN ('AK', 'HI', 'PR', 'VI'))))) AND (yearD >= 2003 AND yearD < 2004) | | 10964 | SELECT `YearD`,`ArrDelayMinutes`,`Carrier` FROM ontime.ontime_part WHERE (((NOT (DayOfWeek IN (6, 7)))) AND ((NOT (OriginState IN ('AK', 'HI', 'PR', 'VI')))) AND ((NOT (DestState IN ('AK', 'HI', 'PR', 'VI'))))) AND (yearD >= 1995 AND yearD < 1996) | | 10957 | SELECT `YearD`,`ArrDelayMinutes`,`Carrier` FROM ontime.ontime_part WHERE (((NOT (DayOfWeek IN (6, 7)))) AND ((NOT (OriginState IN ('AK', 'HI', 'PR', 'VI')))) AND ((NOT (DestState IN ('AK', 'HI', 'PR', 'VI'))))) AND (yearD >= 2004 AND yearD < 2005) | | 10949 | SELECT `YearD`,`ArrDelayMinutes`,`Carrier` FROM ontime.ontime_part WHERE (((NOT (DayOfWeek IN (6, 7)))) AND ((NOT (OriginState IN ('AK', 'HI', 'PR', 'VI')))) AND ((NOT (DestState IN ('AK', 'HI', 'PR', 'VI'))))) AND (yearD >= 1989 AND yearD < 1990) | | 10950 | SELECT `YearD`,`ArrDelayMinutes`,`Carrier` FROM ontime.ontime_part WHERE (((NOT (DayOfWeek IN (6, 7)))) AND ((NOT (OriginState IN ('AK', 'HI', 'PR', 'VI')))) AND ((NOT (DestState IN ('AK', 'HI', 'PR', 'VI'))))) AND (yearD >= 1997 AND yearD < 1998) | | 10952 | SELECT `YearD`,`ArrDelayMinutes`,`Carrier` FROM ontime.ontime_part WHERE (((NOT (DayOfWeek IN (6, 7)))) AND ((NOT (OriginState IN ('AK', 'HI', 'PR', 'VI')))) AND ((NOT (DestState IN ('AK', 'HI', 'PR', 'VI'))))) AND (yearD >= 2013) | | 10958 | SELECT `YearD`,`ArrDelayMinutes`,`Carrier` FROM ontime.ontime_part WHERE (((NOT (DayOfWeek IN (6, 7)))) AND ((NOT (OriginState IN ('AK', 'HI', 'PR', 'VI')))) AND ((NOT (DestState IN ('AK', 'HI', 'PR', 'VI'))))) AND (yearD >= 1992 AND yearD < 1993) | | 10960 | SELECT `YearD`,`ArrDelayMinutes`,`Carrier` FROM ontime.ontime_part WHERE (((NOT (DayOfWeek IN (6, 7)))) AND ((NOT (OriginState IN ('AK', 'HI', 'PR', 'VI')))) AND ((NOT (DestState IN ('AK', 'HI', 'PR', 'VI'))))) AND (yearD >= 2000 AND yearD < 2001) | | 10959 | SELECT `YearD`,`ArrDelayMinutes`,`Carrier` FROM ontime.ontime_part WHERE (((NOT (DayOfWeek IN (6, 7)))) AND ((NOT (OriginState IN ('AK', 'HI', 'PR', 'VI')))) AND ((NOT (DestState IN ('AK', 'HI', 'PR', 'VI'))))) AND (yearD >= 2012 AND yearD < 2013) | +-------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ 26 rows in set (0.00 sec)Spark is running 26 queries in parallel, which is great. As the table is partitioned it only uses one partition per query, but scans the whole partition:mysql> explain partitions SELECT `YearD`,`ArrDelayMinutes`,`Carrier` FROM ontime.ontime_part WHERE (((NOT (DayOfWeek IN (6, 7)))) AND ((NOT (OriginState IN ('AK', 'HI', 'PR', 'VI')))) AND ((NOT (DestState IN ('AK', 'HI', 'PR', 'VI'))))) AND (yearD >= 2001 AND yearD < 2002)G *************************** 1. row *************************** id: 1 select_type: SIMPLE table: ontime_part partitions: p2001 type: ALL possible_keys: NULL key: NULL key_len: NULL ref: NULL rows: 5814106 Extra: Using where 1 row in set (0.00 sec)In this case, as the box has 12 CPU cores / 24 threads, it efficently executes 26 queries in parallel and the partitioned table helps to avoid contention issues (I wish MySQL could scan partitions in parallel, but it can’t at the time of writing). Another interesting thing is that Spark can “push down” some of the conditions to MySQL, but only those inside the “where” clause. All group by/order by/aggregations are done inside Spark. It  needs to retrieve data from MySQL to satisfy those conditions and will not push down group by/order by/etc to MySQL. That also means that queries without “where” conditions (for example “select count(*) as cnt, carrier from ontime group by carrier order by cnt desc limit 10”) will have to retrieve all data from MySQL and load it to Spark (as opposed to MySQL will do all group by inside). Running it in Spark might be slower or faster (depending on the amount of data and use of indexes) but it also requires more resources and potentially more memory dedicated for Spark. The above query is translated to 26 queries, each does a “select carrier from ontime_part where (yearD >= N AND yearD < N)” Pushing down the whole query into MySQL  If we want to avoid sending all data from MySQL to Spark we have the option of creating a temporary table on top of a query (similar to MySQL’s create temporary table as select …). In Scala:val tableQuery = "(select yeard, count(*) from ontime group by yeard) tmp" val jdbcDFtmp = spark.read.format("jdbc").options( Map("url" -> "jdbc:mysql://localhost:3306/ontime?user=root&password=", "dbtable" -> tableQuery, "fetchSize" -> "10000" )).load() jdbcDFtmp.createOrReplaceTempView("ontime_tmp")In Spark SQL:CREATE TEMPORARY VIEW ontime_tmp USING org.apache.spark.sql.jdbc OPTIONS ( url "jdbc:mysql://localhost:3306/ontime?user=root&password=mysql", dbtable "(select yeard, count(*) from ontime_part group by yeard) tmp", fetchSize "1000" ); select * from ontime_tmp;Please note: We do not want to use “partitionColumn” here, otherwise we will see 26 queries like this in MySQL: “SELECT yeard, count(*) FROM (select yeard, count(*) from ontime_part group by yeard) tmp where (yearD >= N AND yearD < N)” (obviously not optimal) This is not a good use of Spark, more like a “hack.” The only good reason to do it is to be able to have the result of the query as a source of an additional query. Query cache in Spark Another option is to cache the result of the query (or even the whole table) and then use .filter in Scala for faster processing. This requires sufficient memory dedicated for Spark. The good news is we can add additional nodes to Spark and get more memory for Spark cluster. Spark SQL example:CREATE TEMPORARY VIEW ontime_latest USING org.apache.spark.sql.jdbc OPTIONS ( url "jdbc:mysql://localhost:3306/ontime?user=root&password=", dbtable "ontime.ontime_part partition (p2013, p2014)", fetchSize "1000", partitionColumn "yearD", lowerBound "1988", upperBound "2014", numPartitions "26" ); cache table ontime_latest; spark-sql> cache table ontime_latest; Time taken: 465.076 seconds spark-sql> select count(*) from ontime_latest; 5349447 Time taken: 0.526 seconds, Fetched 1 row(s) spark-sql> select count(*), dayofweek from ontime_latest group by dayofweek; 790896 1 634664 6 795540 3 794667 5 808243 4 743282 7 782155 2 Time taken: 0.541 seconds, Fetched 7 row(s) spark-sql> select min(yearD), max(yearD) as max_year, Carrier, count(*) as cnt, sum(if(ArrDelayMinutes>30, 1, 0)) as flights_delayed, round(sum(if(ArrDelayMinutes>30, 1, 0))/count(*),2) as rate FROM ontime_latest WHERE DayOfWeek not in (6,7) and OriginState not in ('AK', 'HI', 'PR', 'VI') and DestState not in ('AK', 'HI', 'PR', 'VI') and (origin='RDU' or dest = 'RDU') GROUP by carrier HAVING cnt > 1000 and max_year > '1990' ORDER by rate DESC, cnt desc LIMIT 10; 2013 2013 MQ 9339 1734 0.19 2013 2013 B6 3302 516 0.16 2013 2013 EV 9225 1331 0.14 2013 2013 UA 1317 177 0.13 2013 2013 AA 5354 620 0.12 2013 2013 9E 5520 593 0.11 2013 2013 WN 10968 1130 0.1 2013 2013 US 5722 549 0.1 2013 2013 DL 6313 478 0.08 2013 2013 FL 2433 205 0.08 Time taken: 2.036 seconds, Fetched 10 row(s)Here we cache partitions p2013 and p2014 in Spark. This retrieves the data from MySQL and loads it in Spark. After that all queries run on the cached data and will be much faster. With Scala we can cache the result of a query and then use filters to only get the information we need:val sqlDF = sql("SELECT flightdate, origin, dest, depdelayminutes, arrdelayminutes, carrier, TailNum, Cancelled, Diverted, Distance from ontime") sqlDF.cache().show() scala> sqlDF.filter("flightdate='1988-01-01'").count() res5: Long = 862 Using Spark with Percona XtraDB Cluster As Spark can be used in a cluster mode and scale with more and more nodes, reading data from a single MySQL is a bottleneck. We can use MySQL replication slave servers or Percona XtraDB Cluster (PXC) nodes as a Spark datasource. To test it out, I’ve provisioned Percona XtraDB Cluster with three nodes on AWS (I’ve used m4.2xlarge Ubuntu instances) and also started Apache Spark on each node: Node1 (pxc1): Percona Server + Spark Master + Spark worker node + Spark SQL running Node2 (pxc2): Percona Server + Spark worker node Node3 (pxc3): Percona Server + Spark worker node All the Spark worker nodes use the memory configuration option:cat conf/spark-env.sh export SPARK_WORKER_MEMORY=24gThen I can start spark-sql (also need to have connector/J JAR file copied to all nodes):$ ./bin/spark-sql --driver-memory 4G --master spark://pxc1:7077When creating a table, I still use localhost to connect to MySQL (url “jdbc:mysql://localhost:3306/ontime?user=root&password=xxx”). As Spark worker nodes are running on the same instance as Percona Cluster nodes, it will use the local connection. Then running a Spark SQL will evenly distribute all 26 MySQL queries among the three MySQL nodes. Alternatively we can run Spark cluster on a separate host and connect it to the HA Proxy, which in turn will load balance selects across multiple Percona XtraDB Cluster nodes. Query Performance Benchmark Finally, here is the query response time test on the three AWS Percona XtraDB Cluster nodes: Query 1: select min(yearD), max(yearD) as max_year, Carrier, count(*) as cnt, sum(if(ArrDelayMinutes>30, 1, 0)) as flights_delayed, round(sum(if(ArrDelayMinutes>30, 1, 0))/count(*),2) as rate FROM ontime_part WHERE DayOfWeek not in (6,7) and OriginState not in ('AK', 'HI', 'PR', 'VI') and DestState not in ('AK', 'HI', 'PR', 'VI') GROUP by carrier HAVING cnt > 1000 and max_year > '1990' ORDER by rate DESC, cnt desc LIMIT 10; Query / Index type MySQL Time Spark Time (3 nodes) Times Improvement No covered index (partitioned) 19 min 16.58 sec 192.17 sec 6.02 Covered index (partitioned) 2 min 10.81 sec 48.38 sec 2.7   Query 2: select dayofweek, count(*) from ontime_part group by dayofweek; Query / Index type MySQL Time Spark Time (3 nodes) Times Improvement No covered index (partitoned) 19 min 15.21 sec 195.058 sec 5.92 Covered index (partitioned) 1 min 10.38 sec 27.323 sec 2.58   Now, this looks really good, but it can be better. With three nodes @ m4.2xlarge we will have 8*3 = 24 cores total (although they are shared between Spark and MySQL). We can expect 10x improvement, especially without a covered index. However, on m4.2xlarge the amount of RAM did not allow me to run MySQL out of memory, so all reads were from EBS non-provisioned IOPS, which only gave me ~120MB/sec. I’ve redone the test on a set of three dedicated servers: 28 cores E5-2683 v3 @ 2.00GHz 240GB of RAM Samsung 850 PRO The test was running completely off RAM: Query 1 (from the above) Query / Index type MySQL Time Spark Time (3 nodes) Times Improvement No covered index (partitoned) 3 min 13.94 sec 14.255 sec 13.61 Covered index (partitioned) 2 min 2.11 sec 9.035 sec 13.52   Query 2: select dayofweek, count(*) from ontime_part group by dayofweek; Query / Index type MySQL Time Spark Time (3 nodes) Times Improvement No covered index (partitoned)  2 min 0.36 sec 7.055 sec 17.06 Covered index (partitioned) 1 min 6.85 sec 4.514 sec 14.81   With this amount of cores and running out of RAM we actually do not have enough concurrency as the table only have 26 partitions. I’ve tried the unpartitioned table with ID primary key and use 128 partitions. Note about partitioning I’ve used partitioned table (partition by year) in my tests to help reduce MySQL level contention. At the same time the “partitionColumn” option in Spark does not require that MySQL table is partitioned. For example, if a table has a primary key, we can use this CREATE VIEW in Spark :CREATE OR REPLACE TEMPORARY VIEW ontime USING org.apache.spark.sql.jdbc OPTIONS ( url "jdbc:mysql://127.0.0.1:3306/ontime?user=root&password=", dbtable "ontime.ontime", fetchSize "1000", partitionColumn "id", lowerBound "1", upperBound "162668934", numPartitions "128" );Assuming we have enough MySQL servers (i.e., nodes or slaves), we can increase the number of partitions and that can improve the parallelism (as opposed to only 26 partitions when running one partition by year). Actually, the above test gives us even better response time: 6.44 seconds for query 1. Where Spark doesn’t work well For faster queries (those that use indexes or can efficiently use an index) it does not make sense to use Spark. Retrieving data from MySQL and loading it into Spark is not free. This overhead can be significant for faster queries. For example, a query like this select count(*) from ontime_part where YearD = 2013 and DayOfWeek = 7 and OriginState = 'NC' and DestState = 'NC'; will only scan 1300 rows and will return instant (0.00 seconds reported by MySQL). An even better example is this: select max(id) from ontime_part. In MySQL, the query will use the index and all calculations will be done inside MySQL. Spark, on the other hand, will have to retrieve all IDs (select id from ontime_part) from MySQL and calculate maximum. That took 24.267 seconds. Conclusion Using Apache Spark as an additional engine level on top of MySQL can help to speed up the slow reporting queries and add much-needed scalability for the long running select queries. In addition, Spark can help with query caching for frequent queries. PS: Visual explain plan with Spark Spark Web GUI provides lots of ways of monitoring Spark jobs. For example, it shows the “job” progress: And SQL visual explain details:

  • Basically Shitty License
    Monty announced that he has created a new non-open source license called the "Business Source License" or BSL.  I think it should have a different name...You see, Monty has fundamentally crafted a straw man to stand in for the general Open Source model by applying his experience in the dog-eat-dog world of forked software, in particular, the "ecosystem" of MySQL.  The software that MariaDB draws the majority of their income from is MariaDB, which is a fork of MySQL.  If you don't know the history, well, you see, SUN bought MySQL, Oracle bought Sun, and Monty, in an environment of nearly Biblical levels of FUD, forked MySQL into MariaDB (both products are named after his daughters).While MariaDB was originally envisioned as a "drop in/drop out" replacement, it has diverged so far from the Oracle product that it is no longer even "drop in" with the latest versions of MySQL. Oracle is adding amazing new features, like a real data dictionary (using innodb tables), improved partition management, group replication, etc, that MariaDB simply can not compete with. Forking MySQL was a good business move for a time, but unfortunately, that time has passed.  MariaDB is now obviously trying to compete with Oracle in different areas than MySQL server innovation.MariaDB corporation's fork of InfiniDB (aka MariaDB Corporation ColumnStore [not to be confused with Microsoft ColumnStore indexes]) is one area where they are obviously trying to differentiate themselves, as well as MaxScale.  I should note though, that even though MariaDB Corporation ColumnStore is GPL, MariaDB still says you must agree to another evaluation agreement to download the binaries.  MaxScale is now BSL which creates problems for the "ecosystem".  The problem is that the "ecosystem" is toxic.  A community can not work when one of the members is actively poisoning the well.  Generally when a software is forked, the communities split, but due to the nature of MySQL that never happened.  This is a toxic "open source" environment, where the "freedom" of open source is just a token gesture and only acts as a vehicle to more restrictive licenses (hosted tools in the case of Percona, non-open source software in the case of MariaDB).  Part of the nature of the "ecosystem" is that the consulting companies (Oracle, MariaDB, Percona) could each support the "full stack" of software.  If you bought Percona support, and there was a bug in MaxScale, Percona could fix it. Now, if you use more than three production instances you have to pay MariaDB a special fee to support just that part of your stack, and if you want Percona support for the rest, you have to pay for that too.  That is harmful to the ecosystem.  Monty just doesn't like fair competition. Surely forking MariaDB from MySQL was leveraging the freedom of open source, but now he eschews open source for (ahem) greener pastures.What is production?I define a production system as one that has passed through the beta phase.  The beta phase is a phase of indeterminate length where the system as a whole, and all subsystems, are tested for correctness in all manners possible.  Such a system does not have to be labled as beta to the public, but only assigned such designation internally, since the public at large may find the term beta "offputting", especially considering that new companies often face significant techincal challenges when trying to scale a system, and such problems can not be identified when at low scale.  Since any such system is subject to unexpected downtime, any site that declares itself beta should declare an uptime SLA which is less than or equal to "two nines".Only when a system has been thoroughly publically stressed at the scale it is intended to operate at, can a system be declared production.  Once that happens, purchasing of licenses can happen.

Joomla template created with Artisteer by Jürgen Körner.